From 37a7e2a4e2d57a2abbbc1e8d03f333a4bbca3bcd Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 13 May 2026 13:35:11 -0500 Subject: [PATCH 01/20] iteration 0 --- executor/executor/artifacts.py | 9 +- executor/executor/config.py | 6 + executor/executor/errors.py | 4 + executor/executor/executor.py | 296 +++++++++++++++++++++++++-------- 4 files changed, 247 insertions(+), 68 deletions(-) diff --git a/executor/executor/artifacts.py b/executor/executor/artifacts.py index 9af4b408..41d3f384 100644 --- a/executor/executor/artifacts.py +++ b/executor/executor/artifacts.py @@ -22,6 +22,13 @@ def __init__(self, name, path): "earthmover_results", "em-results.json" ) +# Only uploaded when a cross-year matching pass runs. Defaults to no-upload +# so the base case (single Earthmover run) doesn't try to upload it. +EM_RESULTS_X_YEAR = JobArtifact( + "earthmover_results_x_year", + "em-results-x-year.json" +) +EM_RESULTS_X_YEAR.needs_upload = False MATCH_RATES = JobArtifact( "student_id_match_rates", os.path.join(config.OUTPUT_DIR, ("student_id_match_rates.csv")), @@ -35,4 +42,4 @@ def __init__(self, name, path): "lb-send-results.json" ) -ALL = [ROSTER, EM_RESULTS, MATCH_RATES, UNMATCHED_STUDENTS, LB_SEND_RESULTS] \ No newline at end of file +ALL = [ROSTER, EM_RESULTS, EM_RESULTS_X_YEAR, MATCH_RATES, UNMATCHED_STUDENTS, LB_SEND_RESULTS] \ No newline at end of file diff --git a/executor/executor/config.py b/executor/executor/config.py index 0974072f..6f3b919c 100644 --- a/executor/executor/config.py +++ b/executor/executor/config.py @@ -1,6 +1,12 @@ +import os + BUNDLE_DIR = 'bundles' OUTPUT_DIR = 'output' +# Where the first (ODS) Earthmover run's output is stashed before the cross-year +# run reuses OUTPUT_DIR for its own output set. +ODS_OUTPUT_DIR = 'output-ods' ROSTER_DOWNLOAD_DIR = 'roster-download-dir' +CROSS_YEAR_ROSTER_PATH = os.path.join(ROSTER_DOWNLOAD_DIR, 'cross_year_roster.jsonl') REQUIRED_ID_MATCH_RATE = 0.5 STUDENT_ASSESSMENT_FAIL_THRESHOLD = 0.75 diff --git a/executor/executor/errors.py b/executor/executor/errors.py index 25742ee2..ed819fff 100644 --- a/executor/executor/errors.py +++ b/executor/executor/errors.py @@ -80,6 +80,10 @@ class EarthmoverRunError(ExecutorError): def __init__(self, stacktrace=None): super().__init__("earthmover_run", stacktrace) +class CrossYearRosterFetchError(ExecutorError): + def __init__(self, stacktrace=None): + super().__init__("cross_year_roster_fetch", stacktrace) + class InsufficientMatchesError(ExecutorError): def __init__(self, match_rate, match_threshold, id_name, id_type, stacktrace=None): self.match_rate = match_rate diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 39f71c4f..f105da1d 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -44,6 +44,7 @@ def __init__(self): self.action = "" self.error = None self.summary = {} + self.cross_year_pass_ran = False self.timeout_seconds = int(os.environ.get("TIMEOUT_SECONDS")) @@ -63,6 +64,9 @@ def __init__(self): shutil.rmtree(".lightbeam", ignore_errors=True) self.output_dir = os.path.abspath(config.OUTPUT_DIR) + # Used only by the two-pass (ODS + cross-year) flow; the first run's output + # gets stashed here so the cross-year run can reuse OUTPUT_DIR. + self.ods_output_dir = os.path.abspath(config.ODS_OUTPUT_DIR) os.mkdir(self.output_dir) os.environ["DATA_DIR"] = self.output_dir os.environ["OUTPUT_DIR"] = self.output_dir @@ -114,6 +118,13 @@ def execute(self): self.map_descriptors() self.earthmover_run() + self.enforce_match_threshold() + + if self.should_run_cross_year_pass(): + self.cross_year_pass() + + self.upload_artifact(artifact.MATCH_RATES) + self.report_unmatched_students() if self.send_to_ods: self.lightbeam_send() @@ -158,6 +169,9 @@ def unpack_job(self, job): self.output_files_url = job["appUrls"]["outputFiles"] self.send_to_ods = job.get("sendToOds", True) + self.cross_year_match_available = job.get("crossYearMatchAvailable", False) + if self.cross_year_match_available: + self.cross_year_roster_url = job["appUrls"]["crossYearRoster"] self.assessment_project = os.path.join( self.wrapper_project, "packages", *job["bundle"]["path"].split("/")[1:] @@ -172,8 +186,10 @@ def unpack_job(self, job): if not self.send_to_ods: self.logger.info("this job is not sending Earthmover output set to an ODS") artifact.LB_SEND_RESULTS.needs_upload = False - # if bypassing the ODS, a roster file is required, for now - self.roster_file_path = job["rosterFilePath"] + # if bypassing the ODS without cross-year matching, a file roster is required. + # Cross-year jobs source the roster from the app instead. + if not self.cross_year_match_available: + self.roster_file_path = job["rosterFilePath"] else: # Note that we always need the API_YEAR env var set in order to run Earthmover. # We also use it in cases when we are using an ODS - in such cases the year used by EM @@ -260,11 +276,34 @@ def get_student_roster(self): if self.send_to_ods: self.get_roster_from_ods() + elif self.cross_year_match_available: + # Sideloaded year + cross-year matching: skip the file roster entirely and + # use the streamed cross-year roster as the (single) roster for this run. + self.get_roster_from_cross_year_endpoint(artifact.ROSTER.path) else: self.get_roster_from_s3() self.upload_artifact(artifact.ROSTER) + def get_roster_from_cross_year_endpoint(self, dest_path): + """Stream the app's cross-year roster into a JSONL file at dest_path.""" + self.logger.info(f"streaming cross-year roster from {self.cross_year_roster_url}") + os.makedirs(os.path.dirname(os.path.abspath(dest_path)), exist_ok=True) + try: + with self.conn.get(self.cross_year_roster_url, stream=True) as resp: + resp.raise_for_status() + with open(dest_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=64 * 1024): + if chunk: + f.write(chunk) + except requests.exceptions.RequestException: + self.error = error.CrossYearRosterFetchError() + raise + + if os.stat(dest_path).st_size == 0: + self.error = error.MissingOdsRosterError() + raise ValueError("Cross-year roster is empty") + def get_roster_from_ods(self): """Fetch student roster from the ODS via lightbeam""" try: @@ -386,11 +425,27 @@ def earthmover_run(self): self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") self.check_input_encoding() + self._run_earthmover(artifact.EM_RESULTS.path) + self.upload_artifact(artifact.EM_RESULTS) + + def _run_earthmover(self, results_path, encoding_override=None): + """Compile and run Earthmover into the given results file. + + Records the encoding that ultimately succeeded as self.successful_encoding so + subsequent runs over the same input file can skip re-detection. + + When encoding_override is provided (e.g. for a follow-on cross-year run), the + encoding-detection retry path is skipped entirely; we trust the caller. + """ fatal = False + em = None try: encoding_mod = [] - if self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: + if encoding_override: + self.logger.info(f"using provided input encoding: {encoding_override}") + encoding_mod.extend(["--set", "sources.input.encoding", encoding_override]) + elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: # if chardet identified an encoding that we think is plausible, use it. # Otherwise, do nothing and default to UTF-8 for the first attempt encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) @@ -402,7 +457,7 @@ def earthmover_run(self): ).check_returncode() # attempt no. 1 - cmd = ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", artifact.EM_RESULTS.path] + cmd = ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", results_path] cmd.extend(encoding_mod) em = subprocess.run( cmd, @@ -415,22 +470,36 @@ def earthmover_run(self): if em.stderr: self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() + + # remember whatever encoding (if any) we just used; a cross-year follow-on + # run will reuse it rather than re-detecting + if encoding_override: + self.successful_encoding = encoding_override + elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: + self.successful_encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) + else: + self.successful_encoding = None except subprocess.CalledProcessError as err: self.logger.warning("earthmover encountered an error") fatal = True # yes it's brittle to check the error against a string like this, but this message hasn't # changed since 2007(!) -> https://github.com/python/cpython/blame/main/Objects/exceptions.c - if err.stderr and "codec can't decode" in err.stderr and self.input_sources["INPUT_FILE"]["encoding"] != "ISO-8859-1": + if ( + not encoding_override + and err.stderr and "codec can't decode" in err.stderr + and self.input_sources["INPUT_FILE"]["encoding"] != "ISO-8859-1" + ): self.logger.error(f"Failed to read file with {self.input_sources['INPUT_FILE']['encoding']} encoding. Retrying with Latin1...") try: # attempt no. 2 - need a new em object to overwrite the decoding error em = subprocess.run( - ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", artifact.EM_RESULTS.path, "--set", "sources.input.encoding", "iso-8859-1"], + ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", results_path, "--set", "sources.input.encoding", "iso-8859-1"], ) em.check_returncode() fatal = False # if we made it this far, we can abort the shutdown + self.successful_encoding = "iso-8859-1" except subprocess.CalledProcessError: # failed again, move on to shutdown procedure pass @@ -445,13 +514,54 @@ def earthmover_run(self): # shut it down self.error = error.EarthmoverRunError() # generic exception that will be caught, with em.stderr reported as the stacktrace - raise Exception(em.stderr) + raise Exception(em.stderr if em else "earthmover failed before invocation") - self.upload_artifact(artifact.EM_RESULTS) - self.upload_artifact(artifact.MATCH_RATES) + def should_run_cross_year_pass(self): + """Should we follow up the primary ODS run with a cross-year matching pass? + + Only relevant when sending to an ODS: sideloaded-year jobs that have cross-year + matching available already use the cross-year roster for their single run. + """ + return ( + self.send_to_ods + and self.cross_year_match_available + and bool(self.num_unmatched_students) + ) - # If we reach this point, it's likely that the input file was basically compatible with the bundle - self.report_unmatched_students() + def cross_year_pass(self): + """Run a second Earthmover pass against a cross-year roster to pick up unmatched students. + + Stashes the first run's output set under ODS_OUTPUT_DIR so the second run can write + fresh files into OUTPUT_DIR. The second run is constrained to the ID column/type that + won in the first run so the unmatched-students UI keeps presenting a single ID. + """ + # Capture first-run match info before _run_earthmover overwrites it. + first_run_id_name = self.highest_match_id_name + first_run_id_type = self.highest_match_id_type + + os.rename(self.output_dir, self.ods_output_dir) + os.mkdir(self.output_dir) + + self.get_roster_from_cross_year_endpoint(config.CROSS_YEAR_ROSTER_PATH) + os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) + + # Constrain to the ID column the first run matched on. The bundle always appends + # studentUniqueId internally, so we pass an empty list when that's what won. + os.environ["POSSIBLE_STUDENT_ID_COLUMNS"] = first_run_id_name + os.environ["EDFI_STUDENT_ID_TYPES"] = ( + "" if first_run_id_type == "studentUniqueId" else first_run_id_type + ) + self.logger.info( + f"cross-year pass: matching on {first_run_id_name} / {first_run_id_type}" + ) + + self._run_earthmover( + artifact.EM_RESULTS_X_YEAR.path, + encoding_override=self.successful_encoding, + ) + artifact.EM_RESULTS_X_YEAR.needs_upload = True + self.upload_artifact(artifact.EM_RESULTS_X_YEAR) + self.cross_year_pass_ran = True def check_input_encoding(self): """Determine whether assessment file should be loaded with a non-UTF-8 encoding""" @@ -613,38 +723,56 @@ def record_highest_match_rate(self): self.logger.debug("no students matched any ID. Skipping upload of unmatched students file") artifact.UNMATCHED_STUDENTS.needs_upload = False + def enforce_match_threshold(self): + """Halt if the primary Earthmover run's best match rate is below the configured threshold. + + Sub-threshold matches almost always indicate the wrong file was uploaded; in that case + we want to abort before doing any further work (including a cross-year second pass) + and surface a clear error to the user. + """ + if not self.num_unmatched_students: + return + if self.highest_match_rate >= config.REQUIRED_ID_MATCH_RATE: + return + + self.error = error.InsufficientMatchesError( + self.highest_match_rate, config.REQUIRED_ID_MATCH_RATE, + self.highest_match_id_name, self.highest_match_id_type, + ) + # For now, since we're asking the user to revisit their entire file, it's simpler + # if we don't return the unmatched students file at all + self.logger.debug("too many unmatched students. Skipping upload") + artifact.UNMATCHED_STUDENTS.needs_upload = False + raise ValueError( + f"insufficient ID matches to continue " + f"(highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; " + f"ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})" + ) + def report_unmatched_students(self): - """Alert the app to the existence of unmatched students (if any) and the best candidate for ID matching""" - if self.num_unmatched_students == 0: + """Alert the app to any unmatched students that remain after all Earthmover passes.""" + if not self.num_unmatched_students: return self.logger.warning('earthmover run failed to match some student IDs') - if self.highest_match_rate >= config.REQUIRED_ID_MATCH_RATE: - # in this case, there are unmatched students but not so many that we doubt the - # integrity of the file. Send the ones that match and give the rest back to the user - # if an "actual" ID is replicated as studentUniqueId, we should send the actual ID to the user - id_type_to_report = self.highest_match_id_type - if self.highest_match_id_type == "studentUniqueId" and self.stu_unique_id_in_roster: - id_type_to_report = self.stu_unique_id_in_roster - - # additional context so the app can help the user fix their file - # in this case, num_unmatched_students is guaranteed to be an int instead of None - self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) - self.upload_artifact(artifact.UNMATCHED_STUDENTS) - else: - # Insufficient matches. Assume the input file is no good Don't bother uploading anything. - # Instead, alert the user with an error - self.error = error.InsufficientMatchesError(self.highest_match_rate, config.REQUIRED_ID_MATCH_RATE, self.highest_match_id_name, self.highest_match_id_type) - # For now, since we're asking the user to revisit their entire file, it's simpler if we don't - # return the unmatched students file at all - self.logger.debug("too many unmatched students. Skipping upload") - artifact.UNMATCHED_STUDENTS.needs_upload = False - raise ValueError(f"insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") + # if an "actual" ID is replicated as studentUniqueId, we should send the actual ID to the user + id_type_to_report = self.highest_match_id_type + if self.highest_match_id_type == "studentUniqueId" and self.stu_unique_id_in_roster: + id_type_to_report = self.stu_unique_id_in_roster + + # additional context so the app can help the user fix their file + # in this case, num_unmatched_students is guaranteed to be an int instead of None + self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) + self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): """Upload Earthmover's outputs to the ODS""" self.set_action(action.LIGHTBEAM_SEND) + if self.cross_year_pass_ran: + # The records destined for the ODS came from the first Earthmover run and + # now live in the stashed output dir; point lightbeam there. + os.environ["DATA_DIR"] = self.ods_output_dir try: subprocess.run( ["lightbeam", "-c", self.assessment_lightbeam, "send", "--set", "state_dir", ".lightbeam", "--results-file", artifact.LB_SEND_RESULTS.path] @@ -658,46 +786,80 @@ def lightbeam_send(self): self.upload_artifact(artifact.LB_SEND_RESULTS) def compile_summary(self): - """Post results from job to share with the user""" + """Post results from job to share with the user. + + In a two-pass (ODS + cross-year) run, fold the cross-year records into the + same per-resource counts the lightbeam-send results produced — the user sees + a single combined picture. + """ if self.send_to_ods: - lb_send_results = {} - with open(artifact.LB_SEND_RESULTS.path) as f: - lb_send_results = json.load(f) - - required_counts = ["records_processed", "records_skipped", "records_failed"] - for resource, counts in lb_send_results["resources"].items(): - # grab the counts we care about - self.summary[resource] = { - key: val for key, val in counts.items() if key in required_counts - } + self._merge_into_summary(self._summary_from_lb_send(artifact.LB_SEND_RESULTS.path)) + if self.cross_year_pass_ran: + self._merge_into_summary(self._summary_from_em_results(artifact.EM_RESULTS_X_YEAR.path)) else: - # no lightbeam send, use Earthmover stats for now - with open(artifact.EM_RESULTS.path) as f: - em_results = json.load(f) - - dest_prefix = "$destinations." - for key, count in em_results.get("row_counts", {}).items(): - if key.startswith(dest_prefix): - resource = key[len(dest_prefix):] - self.summary[resource] = { - "records_processed": count, - "records_skipped": 0, - "records_failed": 0, - } + self._merge_into_summary(self._summary_from_em_results(artifact.EM_RESULTS.path)) if self.summary: self.send_job_summary() + def _summary_from_lb_send(self, results_path): + """Per-resource processed/skipped/failed counts from a lightbeam-send results file.""" + with open(results_path) as f: + lb_send_results = json.load(f) + required_counts = ["records_processed", "records_skipped", "records_failed"] + return { + resource: {k: v for k, v in counts.items() if k in required_counts} + for resource, counts in lb_send_results["resources"].items() + } + + def _summary_from_em_results(self, results_path): + """Per-resource processed counts from an Earthmover results file. + + Used when records are sideloaded rather than sent to an ODS — Earthmover row + counts are the best signal we have for those. + """ + with open(results_path) as f: + em_results = json.load(f) + dest_prefix = "$destinations." + out = {} + for key, count in em_results.get("row_counts", {}).items(): + if key.startswith(dest_prefix): + resource = key[len(dest_prefix):] + out[resource] = { + "records_processed": count, + "records_skipped": 0, + "records_failed": 0, + } + return out + + def _merge_into_summary(self, partial): + for resource, counts in partial.items(): + target = self.summary.setdefault(resource, {}) + for key, value in counts.items(): + target[key] = target.get(key, 0) + value + def upload_output(self): - """Upload Earthmover output set to S3 and notify the app of their location""" + """Upload Earthmover output set(s) to S3 and notify the app of their location(s). + + In a two-pass run, the first run's output set (sent to the ODS) lives in the + stash dir and the cross-year run's set (sideloaded) lives in OUTPUT_DIR. We + upload them to separate S3 subdirs and send one alert per non-empty set. + """ self.set_action(action.UPLOAD_OUTPUT) - output_type = "ods" if self.send_to_ods else "non-ods" - s3_subdir = f"{self.s3_out_path}/{output_type}" + if self.cross_year_pass_ran: + self._upload_output_set(self.ods_output_dir, "ods", sent_to_ods=True) + self._upload_output_set(self.output_dir, "non-ods", sent_to_ods=False) + else: + output_type = "ods" if self.send_to_ods else "non-ods" + self._upload_output_set(self.output_dir, output_type, sent_to_ods=self.send_to_ods) + def _upload_output_set(self, src_dir, output_type, sent_to_ods): + """Upload all non-empty files in src_dir to // and alert the app.""" + s3_subdir = f"{self.s3_out_path}/{output_type}" uploaded = False - for fname in os.listdir(self.output_dir): - fpath = os.path.join(self.output_dir, fname) + for fname in os.listdir(src_dir): + fpath = os.path.join(src_dir, fname) if not os.path.isfile(fpath) or os.stat(fpath).st_size == 0: continue @@ -711,10 +873,10 @@ def upload_output(self): uploaded = True if not uploaded: - self.logger.warning("no earthmover output files found to upload") + self.logger.warning(f"no output files found in {src_dir} to upload") return - self.send_job_output_alert(s3_subdir) + self.send_job_output_alert(s3_subdir, sent_to_ods) def upload_remaining_artifacts(self): """Attempt to upload all artifacts that have not yet been uploaded""" @@ -813,11 +975,11 @@ def send_job_summary(self): self.logger.debug(f"Sending summary") self.conn.post(self.summary_url, json=self.summary) - def send_job_output_alert(self, s3_prefix): + def send_job_output_alert(self, s3_prefix, sent_to_ods): """Notify the app that an Earthmover output set has been uploaded to S3""" self.logger.debug(f"Notifying app of output set at {s3_prefix}") self.conn.post(self.output_files_url, json={ - "sentToOds": self.send_to_ods, + "sentToOds": sent_to_ods, "path": s3_prefix, }) From 1706e3fba91b64ee6e96aef9b43ee2f2df99f0fb Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 13 May 2026 14:23:36 -0500 Subject: [PATCH 02/20] wip --- executor/executor/artifacts.py | 11 +++++------ executor/executor/config.py | 8 ++------ executor/executor/executor.py | 28 +++++++++++++++++----------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/executor/executor/artifacts.py b/executor/executor/artifacts.py index 41d3f384..5faa21de 100644 --- a/executor/executor/artifacts.py +++ b/executor/executor/artifacts.py @@ -8,10 +8,9 @@ import executor.config as config class JobArtifact: - def __init__(self, name, path): + def __init__(self, name, path, needs_upload=True): self.name = name self.path = path - self.needs_upload = True ROSTER = JobArtifact( @@ -22,13 +21,13 @@ def __init__(self, name, path): "earthmover_results", "em-results.json" ) -# Only uploaded when a cross-year matching pass runs. Defaults to no-upload -# so the base case (single Earthmover run) doesn't try to upload it. +# Only generated when earthmover runs for a second time as part of cross-year ID matching. +# If a cross-year match is performed as part of a sideload-only job, EM_RESULTS is used alone EM_RESULTS_X_YEAR = JobArtifact( "earthmover_results_x_year", - "em-results-x-year.json" + "em-results-x-year.json", + False ) -EM_RESULTS_X_YEAR.needs_upload = False MATCH_RATES = JobArtifact( "student_id_match_rates", os.path.join(config.OUTPUT_DIR, ("student_id_match_rates.csv")), diff --git a/executor/executor/config.py b/executor/executor/config.py index 6f3b919c..fa55e218 100644 --- a/executor/executor/config.py +++ b/executor/executor/config.py @@ -1,12 +1,8 @@ -import os - BUNDLE_DIR = 'bundles' OUTPUT_DIR = 'output' -# Where the first (ODS) Earthmover run's output is stashed before the cross-year -# run reuses OUTPUT_DIR for its own output set. -ODS_OUTPUT_DIR = 'output-ods' +OUTPUT_DIR_FIRST_RUN = 'output-first-run' # TODO: ROSTER_DOWNLOAD_DIR = 'roster-download-dir' -CROSS_YEAR_ROSTER_PATH = os.path.join(ROSTER_DOWNLOAD_DIR, 'cross_year_roster.jsonl') +CROSS_YEAR_ROSTER_PATH = 'cross_year_roster.jsonl' REQUIRED_ID_MATCH_RATE = 0.5 STUDENT_ASSESSMENT_FAIL_THRESHOLD = 0.75 diff --git a/executor/executor/executor.py b/executor/executor/executor.py index f105da1d..b32f33b0 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -44,7 +44,7 @@ def __init__(self): self.action = "" self.error = None self.summary = {} - self.cross_year_pass_ran = False + self.cross_year_pass_ran = False # FIXME: not a fan of this construction self.timeout_seconds = int(os.environ.get("TIMEOUT_SECONDS")) @@ -64,9 +64,10 @@ def __init__(self): shutil.rmtree(".lightbeam", ignore_errors=True) self.output_dir = os.path.abspath(config.OUTPUT_DIR) + # TODO: only want to set this when relevant # Used only by the two-pass (ODS + cross-year) flow; the first run's output # gets stashed here so the cross-year run can reuse OUTPUT_DIR. - self.ods_output_dir = os.path.abspath(config.ODS_OUTPUT_DIR) + self.ods_output_dir = os.path.abspath(config.OUTPUT_DIR_FIRST_RUN) os.mkdir(self.output_dir) os.environ["DATA_DIR"] = self.output_dir os.environ["OUTPUT_DIR"] = self.output_dir @@ -123,7 +124,7 @@ def execute(self): if self.should_run_cross_year_pass(): self.cross_year_pass() - self.upload_artifact(artifact.MATCH_RATES) + self.upload_artifact(artifact.MATCH_RATES) # FIXME: blegh self.report_unmatched_students() if self.send_to_ods: @@ -186,6 +187,7 @@ def unpack_job(self, job): if not self.send_to_ods: self.logger.info("this job is not sending Earthmover output set to an ODS") artifact.LB_SEND_RESULTS.needs_upload = False + # TODO: # if bypassing the ODS without cross-year matching, a file roster is required. # Cross-year jobs source the roster from the app instead. if not self.cross_year_match_available: @@ -274,21 +276,23 @@ def get_student_roster(self): """Download a list of students so they can be used to match IDs""" self.set_action(action.GET_ROSTER) + # TODO: better documentation if self.send_to_ods: self.get_roster_from_ods() elif self.cross_year_match_available: # Sideloaded year + cross-year matching: skip the file roster entirely and # use the streamed cross-year roster as the (single) roster for this run. - self.get_roster_from_cross_year_endpoint(artifact.ROSTER.path) + self.get_roster_from_cross_year_endpoint() else: self.get_roster_from_s3() self.upload_artifact(artifact.ROSTER) - def get_roster_from_cross_year_endpoint(self, dest_path): - """Stream the app's cross-year roster into a JSONL file at dest_path.""" + def get_roster_from_cross_year_endpoint(self): + """Stream the app's cross-year roster into a JSONL file""" self.logger.info(f"streaming cross-year roster from {self.cross_year_roster_url}") - os.makedirs(os.path.dirname(os.path.abspath(dest_path)), exist_ok=True) + dest_path = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) + # FIXME: would be good to retry this a couple of times in case the stream is interrupted try: with self.conn.get(self.cross_year_roster_url, stream=True) as resp: resp.raise_for_status() @@ -301,7 +305,7 @@ def get_roster_from_cross_year_endpoint(self, dest_path): raise if os.stat(dest_path).st_size == 0: - self.error = error.MissingOdsRosterError() + self.error = error.CrossYearRosterFetchError() raise ValueError("Cross-year roster is empty") def get_roster_from_ods(self): @@ -428,6 +432,7 @@ def earthmover_run(self): self._run_earthmover(artifact.EM_RESULTS.path) self.upload_artifact(artifact.EM_RESULTS) + # TODO: results_path... def _run_earthmover(self, results_path, encoding_override=None): """Compile and run Earthmover into the given results file. @@ -438,7 +443,6 @@ def _run_earthmover(self, results_path, encoding_override=None): encoding-detection retry path is skipped entirely; we trust the caller. """ fatal = False - em = None try: encoding_mod = [] @@ -452,7 +456,7 @@ def _run_earthmover(self, results_path, encoding_override=None): self.logger.info(f"setting input encoding to {encoding}") encoding_mod.extend(["--set", "sources.input.encoding", encoding]) - subprocess.run( + em = subprocess.run( ["earthmover", "-c", self.wrapper_earthmover, "compile"] ).check_returncode() @@ -471,6 +475,7 @@ def _run_earthmover(self, results_path, encoding_override=None): self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() + # FIXME: don't love this construction, although it has a nice effect downstream # remember whatever encoding (if any) we just used; a cross-year follow-on # run will reuse it rather than re-detecting if encoding_override: @@ -514,8 +519,9 @@ def _run_earthmover(self, results_path, encoding_override=None): # shut it down self.error = error.EarthmoverRunError() # generic exception that will be caught, with em.stderr reported as the stacktrace - raise Exception(em.stderr if em else "earthmover failed before invocation") + raise Exception(em.stderr) + # TODO: dunno def should_run_cross_year_pass(self): """Should we follow up the primary ODS run with a cross-year matching pass? From 1330dd9e1711462947d8f121ab51de93e1fa1bbb Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 13 May 2026 18:31:04 -0500 Subject: [PATCH 03/20] iteration 1 --- executor/executor/config.py | 3 +- executor/executor/executor.py | 318 +++++++++++++++------------------- 2 files changed, 141 insertions(+), 180 deletions(-) diff --git a/executor/executor/config.py b/executor/executor/config.py index fa55e218..469d9054 100644 --- a/executor/executor/config.py +++ b/executor/executor/config.py @@ -1,6 +1,7 @@ BUNDLE_DIR = 'bundles' OUTPUT_DIR = 'output' -OUTPUT_DIR_FIRST_RUN = 'output-first-run' # TODO: +# when performing a cross-year-matching second pass: location to move earthmover output set from the first pass +OUTPUT_DIR_FIRST_RUN = 'output-first-run' ROSTER_DOWNLOAD_DIR = 'roster-download-dir' CROSS_YEAR_ROSTER_PATH = 'cross_year_roster.jsonl' diff --git a/executor/executor/executor.py b/executor/executor/executor.py index b32f33b0..e50ce08b 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -21,6 +21,7 @@ import executor.artifacts as artifact import executor.config as config import executor.errors as error +from executor.output_sets import OutputSet handler = logging.StreamHandler() _formatter = logging.Formatter( @@ -44,7 +45,6 @@ def __init__(self): self.action = "" self.error = None self.summary = {} - self.cross_year_pass_ran = False # FIXME: not a fan of this construction self.timeout_seconds = int(os.environ.get("TIMEOUT_SECONDS")) @@ -64,10 +64,6 @@ def __init__(self): shutil.rmtree(".lightbeam", ignore_errors=True) self.output_dir = os.path.abspath(config.OUTPUT_DIR) - # TODO: only want to set this when relevant - # Used only by the two-pass (ODS + cross-year) flow; the first run's output - # gets stashed here so the cross-year run can reuse OUTPUT_DIR. - self.ods_output_dir = os.path.abspath(config.OUTPUT_DIR_FIRST_RUN) os.mkdir(self.output_dir) os.environ["DATA_DIR"] = self.output_dir os.environ["OUTPUT_DIR"] = self.output_dir @@ -118,17 +114,9 @@ def execute(self): self.get_input_files() self.map_descriptors() - self.earthmover_run() - self.enforce_match_threshold() + self.orchestrate_earthmover() - if self.should_run_cross_year_pass(): - self.cross_year_pass() - - self.upload_artifact(artifact.MATCH_RATES) # FIXME: blegh - self.report_unmatched_students() - - if self.send_to_ods: - self.lightbeam_send() + self.lightbeam_send() self.upload_output() self.compile_summary() @@ -172,7 +160,7 @@ def unpack_job(self, job): self.send_to_ods = job.get("sendToOds", True) self.cross_year_match_available = job.get("crossYearMatchAvailable", False) if self.cross_year_match_available: - self.cross_year_roster_url = job["appUrls"]["crossYearRoster"] + self.cross_year_roster_url = job["appUrls"]["roster"] self.assessment_project = os.path.join( self.wrapper_project, "packages", *job["bundle"]["path"].split("/")[1:] @@ -187,10 +175,8 @@ def unpack_job(self, job): if not self.send_to_ods: self.logger.info("this job is not sending Earthmover output set to an ODS") artifact.LB_SEND_RESULTS.needs_upload = False - # TODO: - # if bypassing the ODS without cross-year matching, a file roster is required. - # Cross-year jobs source the roster from the app instead. if not self.cross_year_match_available: + # If bypassing the ODS and not pulling from EDU, require that a roster file exists in S3 self.roster_file_path = job["rosterFilePath"] else: # Note that we always need the API_YEAR env var set in order to run Earthmover. @@ -273,33 +259,30 @@ def modify_local_lightbeam(self): ) def get_student_roster(self): - """Download a list of students so they can be used to match IDs""" + """Download a list of students so they can be used to match IDs in the initial Earthmover run""" self.set_action(action.GET_ROSTER) - # TODO: better documentation if self.send_to_ods: + # Case 1: initially attempt to match on current year; + # leave open the cross-year option if we take another pass self.get_roster_from_ods() elif self.cross_year_match_available: - # Sideloaded year + cross-year matching: skip the file roster entirely and - # use the streamed cross-year roster as the (single) roster for this run. - self.get_roster_from_cross_year_endpoint() + # Case 2: not sending to this year's ODS but we have access to EDU; + # only running Earthmover once with cross-year roster + self.get_roster_from_edu() else: + # Case 3: not sending to this year's ODS and EDU is unavailable; + # only running Earthmover once with uploaded roster self.get_roster_from_s3() self.upload_artifact(artifact.ROSTER) - def get_roster_from_cross_year_endpoint(self): - """Stream the app's cross-year roster into a JSONL file""" + def get_roster_from_edu(self): + """Query EDU via the Runway app and load cross-year roster data into a JSONL file""" self.logger.info(f"streaming cross-year roster from {self.cross_year_roster_url}") dest_path = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) - # FIXME: would be good to retry this a couple of times in case the stream is interrupted try: - with self.conn.get(self.cross_year_roster_url, stream=True) as resp: - resp.raise_for_status() - with open(dest_path, "wb") as f: - for chunk in resp.iter_content(chunk_size=64 * 1024): - if chunk: - f.write(chunk) + stream_to_file(self.conn, self.cross_year_roster_url, dest_path) except requests.exceptions.RequestException: self.error = error.CrossYearRosterFetchError() raise @@ -420,49 +403,67 @@ def map_descriptors(self): self.logger.debug(f"Mapped {len(descriptors)} {name} values") - def earthmover_run(self): - """Run an Earthmover bundle to transform assessment data""" + def orchestrate_earthmover(self): + """Run an Earthmover bundle to transform assessment data. + + Conditionally run Earthmover a second time on input records that initially failed to match + """ self.set_action(action.EARTHMOVER_RUN) + # first pass, possibly aborting if there are too few matches self.unpack_id_types() os.environ["EDFI_STUDENT_ID_TYPES"] = ",".join(self.distinct_id_types) self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") self.check_input_encoding() - self._run_earthmover(artifact.EM_RESULTS.path) + self.earthmover_run(artifact.EM_RESULTS.path) self.upload_artifact(artifact.EM_RESULTS) + self.enforce_match_threshold() + + self.output_sets = [OutputSet( + local_dir=self.output_dir, + s3_subdir="ods" if self.send_to_ods else "non-ods", + sent_to_ods=self.send_to_ods, + em_results_path=artifact.EM_RESULTS.path, + lb_send_results_path=artifact.LB_SEND_RESULTS.path if self.send_to_ods else None, + )] + if (self.send_to_ods # i.e. we've only tried matching this year's students so far + and self.cross_year_match_available # and we have access to EDU + and bool(self.num_unmatched_students) # and there are unmatched students from the first run + ): + # then take a second pass with the cross-year roster from EDU + # and thus produce a second output set to be sideloaded + cross_year_output = self.cross_year_pass(self.output_sets[0]) + self.output_sets.append(cross_year_output) + + self.upload_artifact(artifact.MATCH_RATES) + self.report_unmatched_students() + + def earthmover_run(self, results_path, encoding_override=None): + """Compile and run Earthmover into the given results directory.""" + if encoding_override: + # Case 1: second pass, so we know what encoding already worked + encoding = encoding_override + # Case 2: first pass and chardet identified an encoding we think is plausible, so use it + elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: + encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) + else: + # Case 3: first pass and we don't have a good guess so just use UTF-8 + encoding = None + encoding_args = ["--set", "sources.input.encoding", encoding] if encoding else [] + if encoding: + self.logger.info(f"using input encoding: {encoding}") - # TODO: results_path... - def _run_earthmover(self, results_path, encoding_override=None): - """Compile and run Earthmover into the given results file. - - Records the encoding that ultimately succeeded as self.successful_encoding so - subsequent runs over the same input file can skip re-detection. - - When encoding_override is provided (e.g. for a follow-on cross-year run), the - encoding-detection retry path is skipped entirely; we trust the caller. - """ fatal = False - try: - encoding_mod = [] - if encoding_override: - self.logger.info(f"using provided input encoding: {encoding_override}") - encoding_mod.extend(["--set", "sources.input.encoding", encoding_override]) - elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: - # if chardet identified an encoding that we think is plausible, use it. - # Otherwise, do nothing and default to UTF-8 for the first attempt - encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) - self.logger.info(f"setting input encoding to {encoding}") - encoding_mod.extend(["--set", "sources.input.encoding", encoding]) - em = subprocess.run( ["earthmover", "-c", self.wrapper_earthmover, "compile"] - ).check_returncode() + ) + em.check_returncode() # attempt no. 1 cmd = ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", results_path] - cmd.extend(encoding_mod) + cmd.extend(encoding_args) em = subprocess.run( cmd, capture_output=True, @@ -475,15 +476,7 @@ def _run_earthmover(self, results_path, encoding_override=None): self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() - # FIXME: don't love this construction, although it has a nice effect downstream - # remember whatever encoding (if any) we just used; a cross-year follow-on - # run will reuse it rather than re-detecting - if encoding_override: - self.successful_encoding = encoding_override - elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: - self.successful_encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) - else: - self.successful_encoding = None + self.successful_encoding = encoding except subprocess.CalledProcessError as err: self.logger.warning("earthmover encountered an error") fatal = True @@ -491,11 +484,11 @@ def _run_earthmover(self, results_path, encoding_override=None): # yes it's brittle to check the error against a string like this, but this message hasn't # changed since 2007(!) -> https://github.com/python/cpython/blame/main/Objects/exceptions.c if ( - not encoding_override + not encoding_override # i.e. only try again if this is the first pass and we're still uncertain about the encoding and err.stderr and "codec can't decode" in err.stderr - and self.input_sources["INPUT_FILE"]["encoding"] != "ISO-8859-1" + and encoding != "iso-8859-1" ): - self.logger.error(f"Failed to read file with {self.input_sources['INPUT_FILE']['encoding']} encoding. Retrying with Latin1...") + self.logger.error(f"Failed to read file with {encoding} encoding. Retrying with Latin1...") try: # attempt no. 2 - need a new em object to overwrite the decoding error em = subprocess.run( @@ -521,34 +514,18 @@ def _run_earthmover(self, results_path, encoding_override=None): # generic exception that will be caught, with em.stderr reported as the stacktrace raise Exception(em.stderr) - # TODO: dunno - def should_run_cross_year_pass(self): - """Should we follow up the primary ODS run with a cross-year matching pass? - - Only relevant when sending to an ODS: sideloaded-year jobs that have cross-year - matching available already use the cross-year roster for their single run. - """ - return ( - self.send_to_ods - and self.cross_year_match_available - and bool(self.num_unmatched_students) - ) - - def cross_year_pass(self): - """Run a second Earthmover pass against a cross-year roster to pick up unmatched students. - - Stashes the first run's output set under ODS_OUTPUT_DIR so the second run can write - fresh files into OUTPUT_DIR. The second run is constrained to the ID column/type that - won in the first run so the unmatched-students UI keeps presenting a single ID. - """ - # Capture first-run match info before _run_earthmover overwrites it. + def cross_year_pass(self, primary): + """Run a second Earthmover pass against a cross-year roster in an attempt to match more students.""" + # Capture first-run match info before earthmover_run overwrites it. first_run_id_name = self.highest_match_id_name first_run_id_type = self.highest_match_id_type - os.rename(self.output_dir, self.ods_output_dir) + first_run_output_dir = os.path.abspath(config.OUTPUT_DIR_FIRST_RUN) + os.rename(self.output_dir, first_run_output_dir) + primary.local_dir = first_run_output_dir os.mkdir(self.output_dir) - self.get_roster_from_cross_year_endpoint(config.CROSS_YEAR_ROSTER_PATH) + self.get_roster_from_edu() os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) # Constrain to the ID column the first run matched on. The bundle always appends @@ -561,13 +538,19 @@ def cross_year_pass(self): f"cross-year pass: matching on {first_run_id_name} / {first_run_id_type}" ) - self._run_earthmover( + self.earthmover_run( artifact.EM_RESULTS_X_YEAR.path, encoding_override=self.successful_encoding, ) artifact.EM_RESULTS_X_YEAR.needs_upload = True self.upload_artifact(artifact.EM_RESULTS_X_YEAR) - self.cross_year_pass_ran = True + + return OutputSet( + local_dir=self.output_dir, + s3_subdir="non-ods", + sent_to_ods=False, + em_results_path=artifact.EM_RESULTS_X_YEAR.path, + ) def check_input_encoding(self): """Determine whether assessment file should be loaded with a non-UTF-8 encoding""" @@ -749,14 +732,14 @@ def enforce_match_threshold(self): # if we don't return the unmatched students file at all self.logger.debug("too many unmatched students. Skipping upload") artifact.UNMATCHED_STUDENTS.needs_upload = False - raise ValueError( - f"insufficient ID matches to continue " - f"(highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; " - f"ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})" - ) + raise ValueError("insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") def report_unmatched_students(self): - """Alert the app to any unmatched students that remain after all Earthmover passes.""" + """Alert the app to any unmatched students that remain after all Earthmover passes. + + This method is only run if enforce_match_threshold determines that enough students have matched that the + input file is basically trustworthy + """ if not self.num_unmatched_students: return @@ -773,12 +756,14 @@ def report_unmatched_students(self): self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): - """Upload Earthmover's outputs to the ODS""" + """Upload an Earthmover output set to the ODS via lightbeam.""" + + if not self.send_to_ods: + return + self.set_action(action.LIGHTBEAM_SEND) - if self.cross_year_pass_ran: - # The records destined for the ODS came from the first Earthmover run and - # now live in the stashed output dir; point lightbeam there. - os.environ["DATA_DIR"] = self.ods_output_dir + # If we ran Earthmover twice, we're only ever sending the first output set + os.environ["DATA_DIR"] = self.output_sets[0].local_dir try: subprocess.run( ["lightbeam", "-c", self.assessment_lightbeam, "send", "--set", "state_dir", ".lightbeam", "--results-file", artifact.LB_SEND_RESULTS.path] @@ -792,84 +777,39 @@ def lightbeam_send(self): self.upload_artifact(artifact.LB_SEND_RESULTS) def compile_summary(self): - """Post results from job to share with the user. - - In a two-pass (ODS + cross-year) run, fold the cross-year records into the - same per-resource counts the lightbeam-send results produced — the user sees - a single combined picture. - """ - if self.send_to_ods: - self._merge_into_summary(self._summary_from_lb_send(artifact.LB_SEND_RESULTS.path)) - if self.cross_year_pass_ran: - self._merge_into_summary(self._summary_from_em_results(artifact.EM_RESULTS_X_YEAR.path)) - else: - self._merge_into_summary(self._summary_from_em_results(artifact.EM_RESULTS.path)) - - if self.summary: + """Send per-resource records-processed/skipped/failed counts to the app.""" + summary = {} + for set in self.output_sets: + for resource, counts in set.counts().items(): + if resource not in summary: + summary[resource] = { + "records_processed": 0, + "records_skipped": 0, + "records_failed": 0, + } + for k in summary[resource]: + summary[resource][k] += counts.get(k, 0) + + if summary: + self.summary = summary self.send_job_summary() - def _summary_from_lb_send(self, results_path): - """Per-resource processed/skipped/failed counts from a lightbeam-send results file.""" - with open(results_path) as f: - lb_send_results = json.load(f) - required_counts = ["records_processed", "records_skipped", "records_failed"] - return { - resource: {k: v for k, v in counts.items() if k in required_counts} - for resource, counts in lb_send_results["resources"].items() - } - - def _summary_from_em_results(self, results_path): - """Per-resource processed counts from an Earthmover results file. - - Used when records are sideloaded rather than sent to an ODS — Earthmover row - counts are the best signal we have for those. - """ - with open(results_path) as f: - em_results = json.load(f) - dest_prefix = "$destinations." - out = {} - for key, count in em_results.get("row_counts", {}).items(): - if key.startswith(dest_prefix): - resource = key[len(dest_prefix):] - out[resource] = { - "records_processed": count, - "records_skipped": 0, - "records_failed": 0, - } - return out - - def _merge_into_summary(self, partial): - for resource, counts in partial.items(): - target = self.summary.setdefault(resource, {}) - for key, value in counts.items(): - target[key] = target.get(key, 0) + value - def upload_output(self): - """Upload Earthmover output set(s) to S3 and notify the app of their location(s). - - In a two-pass run, the first run's output set (sent to the ODS) lives in the - stash dir and the cross-year run's set (sideloaded) lives in OUTPUT_DIR. We - upload them to separate S3 subdirs and send one alert per non-empty set. - """ + """Upload each Earthmover output set to S3 and notify the app of its location.""" self.set_action(action.UPLOAD_OUTPUT) + for set in self.output_sets: + self.upload_output_set(set) - if self.cross_year_pass_ran: - self._upload_output_set(self.ods_output_dir, "ods", sent_to_ods=True) - self._upload_output_set(self.output_dir, "non-ods", sent_to_ods=False) - else: - output_type = "ods" if self.send_to_ods else "non-ods" - self._upload_output_set(self.output_dir, output_type, sent_to_ods=self.send_to_ods) - - def _upload_output_set(self, src_dir, output_type, sent_to_ods): - """Upload all non-empty files in src_dir to // and alert the app.""" - s3_subdir = f"{self.s3_out_path}/{output_type}" + def upload_output_set(self, output_set): + """Upload all non-empty files in one OutputSet's local_dir to its S3 subdir, then alert the app.""" + s3_prefix = f"{self.s3_out_path}/{output_set.s3_subdir}" uploaded = False - for fname in os.listdir(src_dir): - fpath = os.path.join(src_dir, fname) + for fname in os.listdir(output_set.local_dir): + fpath = os.path.join(output_set.local_dir, fname) if not os.path.isfile(fpath) or os.stat(fpath).st_size == 0: continue - dest_fname = f"{s3_subdir}/{fname}" + dest_fname = f"{s3_prefix}/{fname}" self.logger.info(f"uploading output: {fname} -> {dest_fname}") try: self.s3.upload_file(fpath, self.app_bucket, dest_fname) @@ -879,10 +819,10 @@ def _upload_output_set(self, src_dir, output_type, sent_to_ods): uploaded = True if not uploaded: - self.logger.warning(f"no output files found in {src_dir} to upload") + self.logger.warning(f"no Earthmover output files found in {output_set.local_dir} to upload") return - self.send_job_output_alert(s3_subdir, sent_to_ods) + self.send_job_output_alert(s3_prefix, output_set.sent_to_ods) def upload_remaining_artifacts(self): """Attempt to upload all artifacts that have not yet been uploaded""" @@ -993,3 +933,23 @@ def send_job_output_alert(self, s3_prefix, sent_to_ods): def localize_s3_path(path): """Convert an S3 'path' to a single filename""" return path.replace("/", "__") + + +def stream_to_file(session, url, dest_path, max_attempts=3): + """GET url as a stream and write the body to dest_path""" + for attempt in range(1, max_attempts + 1): + try: + with session.get(url, stream=True) as resp: + resp.raise_for_status() + with open(dest_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=64 * 1024): + if chunk: + f.write(chunk) + return + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as e: + if attempt == max_attempts: + raise + sleep_seconds = attempt * 5 + print(f"stream_to_file: attempt {attempt}/{max_attempts} failed ({type(e).__name__}); retrying in {sleep_seconds}s") + time.sleep(sleep_seconds) From cf9d56cf4d58523d4298a4e6aa16eebc1f4e058a Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 13 May 2026 18:33:21 -0500 Subject: [PATCH 04/20] add output_sets --- executor/executor/output_sets.py | 41 ++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 executor/executor/output_sets.py diff --git a/executor/executor/output_sets.py b/executor/executor/output_sets.py new file mode 100644 index 00000000..151d310a --- /dev/null +++ b/executor/executor/output_sets.py @@ -0,0 +1,41 @@ +# An OutputSet describes one collection of Earthmover output files that a job +# produces and uploads to S3. Every job produces at least one output set, which +# may or may not be sent to the ODS. Jobs that perform cross-year matching produce +# two output sets. + +import json + + +class OutputSet: + def __init__(self, local_dir, s3_subdir, sent_to_ods, em_results_path, lb_send_results_path=None): + # Local directory containing the output files Earthmover produced for this set. + self.local_dir = local_dir + # Subdirectory under the job's S3 output path that these files land in. + # The app reads this to distinguish ODS-bound output from sideloaded output. + self.s3_subdir = s3_subdir + # Whether the records in this set were (or will be) sent to an ODS via lightbeam. + self.sent_to_ods = sent_to_ods + # Path to the Earthmover results JSON for the run that produced this set. + self.em_results_path = em_results_path + # Path to the lightbeam-send results JSON, if sent_to_ods + self.lb_send_results_path = lb_send_results_path + + def counts(self): + """Per-resource records-processed/skipped/failed counts contributed by this set. + + For ODS-bound sets, lightbeam's send-results are authoritative (they know + what the ODS actually accepted). For sideloaded sets, Earthmover row counts + are the best signal we have. + """ + if self.sent_to_ods: + with open(self.lb_send_results_path) as f: + return json.load(f)["resources"] + + with open(self.em_results_path) as f: + em_results = json.load(f) + dest_prefix = "$destinations." + return { + key[len(dest_prefix):]: {"records_processed": count} + for key, count in em_results.get("row_counts", {}).items() + if key.startswith(dest_prefix) + } From eaffcba6b4adbda1ff9847bcab6a438dda66c038 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 13 May 2026 21:10:58 -0500 Subject: [PATCH 05/20] lower threshold in EM params --- executor/executor/executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index e50ce08b..06c2f536 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -534,6 +534,8 @@ def cross_year_pass(self, primary): os.environ["EDFI_STUDENT_ID_TYPES"] = ( "" if first_run_id_type == "studentUniqueId" else first_run_id_type ) + # we already know which ID to use so we should succeed no matter how many failed matches remain + os.environ["REQUIRED_ID_MATCH_RATE"] = "0.0" self.logger.info( f"cross-year pass: matching on {first_run_id_name} / {first_run_id_type}" ) From 44fec94e9005ec1849cdb91c24d64f6abab173f0 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 14:58:41 -0500 Subject: [PATCH 06/20] fix needs_upload --- executor/executor/artifacts.py | 1 + executor/executor/executor.py | 28 ++++++++++++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/executor/executor/artifacts.py b/executor/executor/artifacts.py index 5faa21de..6ad2b9aa 100644 --- a/executor/executor/artifacts.py +++ b/executor/executor/artifacts.py @@ -11,6 +11,7 @@ class JobArtifact: def __init__(self, name, path, needs_upload=True): self.name = name self.path = path + self.needs_upload = needs_upload ROSTER = JobArtifact( diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 06c2f536..b89d435f 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -60,8 +60,10 @@ def __init__(self): self.local_mode = os.environ.get("DEPLOYMENT_MODE") == "LOCAL" self.conn = requests.Session() - # wipe lightbeam state so that local runs stay idempotent + # wipe state left behind by a prior local run so reruns are idempotent shutil.rmtree(".lightbeam", ignore_errors=True) + shutil.rmtree(config.OUTPUT_DIR_FIRST_RUN, ignore_errors=True) + shutil.rmtree(config.ROSTER_DOWNLOAD_DIR, ignore_errors=True) self.output_dir = os.path.abspath(config.OUTPUT_DIR) os.mkdir(self.output_dir) @@ -125,15 +127,29 @@ def execute(self): except: # failure cases traceback.print_exc() - self.upload_remaining_artifacts() - - self.update_failure() + # Lock in the error payload immediately so any failure below this point + # still has something to report to the app. if not self.error: self.error = error.UnknownError(traceback.format_exc()) if not self.error.stacktrace: self.error.stacktrace = traceback.format_exc() + # Best-effort cleanup. Failures here must not prevent us from reporting + # the error to the app, which is the most important thing this branch does. + try: + self.upload_remaining_artifacts() + except Exception: + self.logger.exception("upload_remaining_artifacts raised during shutdown; continuing") + + # update_failure runs before send_error so the FAILURE status update + # arrives at the app before the error payload (avoiding the race the + # method's own docstring describes). + try: + self.update_failure() + except Exception: + self.logger.exception("update_failure raised during shutdown; continuing") + self.send_error() else: # success case @@ -848,7 +864,7 @@ def upload_artifact(self, artifact_to_upload, fail_ok=False): if fail_ok: self.logger.debug(f"file empty during shutdown. continuing...") return - self.error_obj = error.ArtifactEmptyError(artifact_to_upload.name, fpath) + self.error = error.ArtifactEmptyError(artifact_to_upload.name, fpath) raise FileNotFoundError(fpath) try: @@ -860,7 +876,7 @@ def upload_artifact(self, artifact_to_upload, fail_ok=False): self.logger.debug(f"upload failed during shutdown. continuing...") return self.error = error.ArtifactS3UploadError( - artifact_to_upload, f"{self.bucket_out_path}/{os.path.basename(fpath)}" + artifact_to_upload.name, f"{self.s3_out_path}/{os.path.basename(fpath)}" ) raise From 9b83af799c04fcc558df47121d8ea2e6d6192f7e Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 15:08:08 -0500 Subject: [PATCH 07/20] log exception body --- executor/executor/executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index b89d435f..054924f2 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -139,16 +139,16 @@ def execute(self): # the error to the app, which is the most important thing this branch does. try: self.upload_remaining_artifacts() - except Exception: - self.logger.exception("upload_remaining_artifacts raised during shutdown; continuing") + except Exception as e: + self.logger.error(f"upload_remaining_artifacts raised during shutdown ({repr(e)}); continuing", exc_info=True) # update_failure runs before send_error so the FAILURE status update # arrives at the app before the error payload (avoiding the race the # method's own docstring describes). try: self.update_failure() - except Exception: - self.logger.exception("update_failure raised during shutdown; continuing") + except Exception as e: + self.logger.error(f"update_failure raised during shutdown ({repr(e)}); continuing", exc_info=True) self.send_error() else: From ad3d790605db93edcadd47d23a83b92ac7733c7f Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 17:01:15 -0500 Subject: [PATCH 08/20] integrate edge-case fix --- executor/executor/executor.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 054924f2..8965227b 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -517,18 +517,23 @@ def earthmover_run(self, results_path, encoding_override=None): except subprocess.CalledProcessError: # failed again, move on to shutdown procedure pass - finally: - # the app relies on the presence of the unmatched students file but does not check - # whether it is populated. It is possible that Earthmover successfully matches students - # (so the file is empty) but then fails during data transformation. We need to check the - # match rates whether or not Earthmover succeeds, so that we don't accidentally tell the - # user there are unmatched students when there are none - self.record_highest_match_rate() - if fatal: - # shut it down - self.error = error.EarthmoverRunError() - # generic exception that will be caught, with em.stderr reported as the stacktrace - raise Exception(em.stderr) + + if fatal: + # It is possible that Earthmover successfully matches students but then + # fails during data transformation. This is most likely to happen when the user + # uploads a file that is recognizable as the correct assessment but has some + # other flaw - for example, the file is from the wrong year. + # In this case. we end up with a match rates file and unmatched students file, + # but we don't want to send either of them to the app. All the user needs to know + # is that the run failed. + artifact.MATCH_RATES.needs_upload = False + artifact.UNMATCHED_STUDENTS.needs_upload = False + # Anyway, yeah, the run failed. Shut it down. + self.error = error.EarthmoverRunError() + # generic exception that will be caught, with em.stderr reported as the stacktrace + raise Exception(em.stderr) + + self.record_highest_match_rate() def cross_year_pass(self, primary): """Run a second Earthmover pass against a cross-year roster in an attempt to match more students.""" From 62a829a4c051cb5d66a3a4a362bb29816a0f1b21 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 17:08:32 -0500 Subject: [PATCH 09/20] clean up language --- executor/executor/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 8965227b..b3367266 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -445,7 +445,7 @@ def orchestrate_earthmover(self): )] if (self.send_to_ods # i.e. we've only tried matching this year's students so far and self.cross_year_match_available # and we have access to EDU - and bool(self.num_unmatched_students) # and there are unmatched students from the first run + and bool(self.num_unmatched_students) # and there are unmatched students from the first pass ): # then take a second pass with the cross-year roster from EDU # and thus produce a second output set to be sideloaded @@ -549,7 +549,7 @@ def cross_year_pass(self, primary): self.get_roster_from_edu() os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) - # Constrain to the ID column the first run matched on. The bundle always appends + # Constrain to the ID column the first pass matched on. The bundle always appends # studentUniqueId internally, so we pass an empty list when that's what won. os.environ["POSSIBLE_STUDENT_ID_COLUMNS"] = first_run_id_name os.environ["EDFI_STUDENT_ID_TYPES"] = ( From 024e7c15024582723409dab8d1309c84749478ae Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 17:29:34 -0500 Subject: [PATCH 10/20] rerun on unmatched students file --- executor/executor/executor.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index b3367266..378f0e87 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -431,7 +431,6 @@ def orchestrate_earthmover(self): os.environ["EDFI_STUDENT_ID_TYPES"] = ",".join(self.distinct_id_types) self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") - self.check_input_encoding() self.earthmover_run(artifact.EM_RESULTS.path) self.upload_artifact(artifact.EM_RESULTS) self.enforce_match_threshold() @@ -455,16 +454,12 @@ def orchestrate_earthmover(self): self.upload_artifact(artifact.MATCH_RATES) self.report_unmatched_students() - def earthmover_run(self, results_path, encoding_override=None): + def earthmover_run(self, results_path): """Compile and run Earthmover into the given results directory.""" - if encoding_override: - # Case 1: second pass, so we know what encoding already worked - encoding = encoding_override - # Case 2: first pass and chardet identified an encoding we think is plausible, so use it - elif self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: + self.check_input_encoding() + if self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) else: - # Case 3: first pass and we don't have a good guess so just use UTF-8 encoding = None encoding_args = ["--set", "sources.input.encoding", encoding] if encoding else [] if encoding: @@ -491,19 +486,13 @@ def earthmover_run(self, results_path, encoding_override=None): if em.stderr: self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() - - self.successful_encoding = encoding except subprocess.CalledProcessError as err: self.logger.warning("earthmover encountered an error") fatal = True # yes it's brittle to check the error against a string like this, but this message hasn't # changed since 2007(!) -> https://github.com/python/cpython/blame/main/Objects/exceptions.c - if ( - not encoding_override # i.e. only try again if this is the first pass and we're still uncertain about the encoding - and err.stderr and "codec can't decode" in err.stderr - and encoding != "iso-8859-1" - ): + if err.stderr and "codec can't decode" in err.stderr and encoding != "iso-8859-1": self.logger.error(f"Failed to read file with {encoding} encoding. Retrying with Latin1...") try: # attempt no. 2 - need a new em object to overwrite the decoding error @@ -513,7 +502,6 @@ def earthmover_run(self, results_path, encoding_override=None): em.check_returncode() fatal = False # if we made it this far, we can abort the shutdown - self.successful_encoding = "iso-8859-1" except subprocess.CalledProcessError: # failed again, move on to shutdown procedure pass @@ -546,6 +534,10 @@ def cross_year_pass(self, primary): primary.local_dir = first_run_output_dir os.mkdir(self.output_dir) + unmatched_path = os.path.join(first_run_output_dir, os.path.basename(artifact.UNMATCHED_STUDENTS.path)) + os.environ["INPUT_FILE"] = unmatched_path + self.input_sources["INPUT_FILE"]["path"] = unmatched_path + self.get_roster_from_edu() os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) @@ -561,10 +553,7 @@ def cross_year_pass(self, primary): f"cross-year pass: matching on {first_run_id_name} / {first_run_id_type}" ) - self.earthmover_run( - artifact.EM_RESULTS_X_YEAR.path, - encoding_override=self.successful_encoding, - ) + self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) artifact.EM_RESULTS_X_YEAR.needs_upload = True self.upload_artifact(artifact.EM_RESULTS_X_YEAR) From 8baaf0b1cdd81a8f25e929ac5c7f5b6f91610fca Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 17:32:30 -0500 Subject: [PATCH 11/20] better integrate other PR --- executor/executor/executor.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 378f0e87..84dd94e6 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -128,23 +128,12 @@ def execute(self): # failure cases traceback.print_exc() - # Lock in the error payload immediately so any failure below this point - # still has something to report to the app. - if not self.error: - self.error = error.UnknownError(traceback.format_exc()) - if not self.error.stacktrace: - self.error.stacktrace = traceback.format_exc() - - # Best-effort cleanup. Failures here must not prevent us from reporting - # the error to the app, which is the most important thing this branch does. + # generic exception catching to be super-defensive while we cleanup and make a best effort to get the error object out the door try: self.upload_remaining_artifacts() except Exception as e: self.logger.error(f"upload_remaining_artifacts raised during shutdown ({repr(e)}); continuing", exc_info=True) - # update_failure runs before send_error so the FAILURE status update - # arrives at the app before the error payload (avoiding the race the - # method's own docstring describes). try: self.update_failure() except Exception as e: From b01bdfc396cdbd84dea6ae11648499b4b767f8f6 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Thu, 14 May 2026 17:38:26 -0500 Subject: [PATCH 12/20] comments --- executor/executor/executor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 84dd94e6..196b9336 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -415,7 +415,7 @@ def orchestrate_earthmover(self): """ self.set_action(action.EARTHMOVER_RUN) - # first pass, possibly aborting if there are too few matches + # first pass self.unpack_id_types() os.environ["EDFI_STUDENT_ID_TYPES"] = ",".join(self.distinct_id_types) self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") @@ -431,6 +431,8 @@ def orchestrate_earthmover(self): em_results_path=artifact.EM_RESULTS.path, lb_send_results_path=artifact.LB_SEND_RESULTS.path if self.send_to_ods else None, )] + + # if we're here, the first pass of Earthmover was successful if (self.send_to_ods # i.e. we've only tried matching this year's students so far and self.cross_year_match_available # and we have access to EDU and bool(self.num_unmatched_students) # and there are unmatched students from the first pass @@ -513,7 +515,7 @@ def earthmover_run(self, results_path): self.record_highest_match_rate() def cross_year_pass(self, primary): - """Run a second Earthmover pass against a cross-year roster in an attempt to match more students.""" + """Run a second Earthmover pass on unmatched students using a cross-year roster in an attempt to match more students.""" # Capture first-run match info before earthmover_run overwrites it. first_run_id_name = self.highest_match_id_name first_run_id_type = self.highest_match_id_type @@ -523,6 +525,7 @@ def cross_year_pass(self, primary): primary.local_dir = first_run_output_dir os.mkdir(self.output_dir) + # use only the students who failed to match the primary ID from the first run unmatched_path = os.path.join(first_run_output_dir, os.path.basename(artifact.UNMATCHED_STUDENTS.path)) os.environ["INPUT_FILE"] = unmatched_path self.input_sources["INPUT_FILE"]["path"] = unmatched_path From b80f5d6873ee628ae5f1f4e3525fd3c9bee6149b Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Tue, 26 May 2026 18:15:08 -0500 Subject: [PATCH 13/20] handle the unmatched students edge cases --- executor/executor/executor.py | 114 ++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 47 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 196b9336..71d6f408 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -422,6 +422,7 @@ def orchestrate_earthmover(self): self.earthmover_run(artifact.EM_RESULTS.path) self.upload_artifact(artifact.EM_RESULTS) + self.record_highest_match_rate() self.enforce_match_threshold() self.output_sets = [OutputSet( @@ -433,9 +434,9 @@ def orchestrate_earthmover(self): )] # if we're here, the first pass of Earthmover was successful - if (self.send_to_ods # i.e. we've only tried matching this year's students so far - and self.cross_year_match_available # and we have access to EDU - and bool(self.num_unmatched_students) # and there are unmatched students from the first pass + if (self.send_to_ods # i.e. we've only tried matching this year's students so far + and self.cross_year_match_available # and we have access to EDU + and self.num_unmatched_students > 0 # and there are unmatched students from the first pass ): # then take a second pass with the cross-year roster from EDU # and thus produce a second output set to be sideloaded @@ -512,11 +513,11 @@ def earthmover_run(self, results_path): # generic exception that will be caught, with em.stderr reported as the stacktrace raise Exception(em.stderr) - self.record_highest_match_rate() - def cross_year_pass(self, primary): """Run a second Earthmover pass on unmatched students using a cross-year roster in an attempt to match more students.""" - # Capture first-run match info before earthmover_run overwrites it. + # The first pass established which ID column matched best; the second pass is + # constrained to that same column. The count is the fallback for the "zero + # matches" case below. first_run_id_name = self.highest_match_id_name first_run_id_type = self.highest_match_id_type @@ -549,6 +550,20 @@ def cross_year_pass(self, primary): artifact.EM_RESULTS_X_YEAR.needs_upload = True self.upload_artifact(artifact.EM_RESULTS_X_YEAR) + count = count_unmatched_students() + if count is None: + # Edge case alert! It may be that in the second pass, there are no matches even with the + # expanded universe of students. Of course we need to return these rows to the user and tell + # them how many records failed to match. However, with no matches, our usual method of counting + # via the match rates file breaks down, because that file is now empty. So here we are saying + # "if there were no matches on the second pass, use the number of unmatched from the first pass" + count = self.num_unmatched_students + + # then, in either case, we do the usual thing: upload the unmatched students file if and only if + # there are unmatched students + self.num_unmatched_students = count + artifact.UNMATCHED_STUDENTS.needs_upload = count > 0 + return OutputSet( local_dir=self.output_dir, s3_subdir="non-ods", @@ -672,58 +687,41 @@ def record_highest_match_rate(self): # local ID, then the 4/10 without matching state IDs go in the unmatched students file, but both types # of match are recorded in the match rates file. Both are used for different purposes by the app and # within the exeuctor - # - # There are three circumstances under which we don't upload the unmatched students file and thus don't - # show the "some students failed to match" message to the user. These are: - # 1. Earthmover failed, so we don't know how many students matched - # 2. The match rates file tells us that there is a perfect match, so there is nothing to upload - # 3. The match rates file is empty, so there were no matches and the unmatched students file is the same as the original input + # Why do we use the match rates file to count unmatched students? Because the unmatched students file + # may have multiple header rows, so it's hard to use it for counting: + # ref: https://github.com/edanalytics/runway/pull/6 # in the case of literally zero matches (or a failed run), the file is empty and these defaults remain self.highest_match_rate = 0.0 self.highest_match_id_name = "N/A" self.highest_match_id_type = "N/A" - # but if the file is empty, we don't learn this number. We need to distinguish between 0 and "don't know" self.num_unmatched_students = None - try: - with open(artifact.MATCH_RATES.path) as f: - match_rates = [ - {k: v for k, v in row.items()} - for row in csv.DictReader(f, skipinitialspace=True) - ] - except FileNotFoundError: - # case 1 - self.logger.debug("failed Earthmover run. Skipping upload of unmatched students file") + match_rates = load_match_rates() + if len(match_rates) == 0: + # either + # - Earthmover failed and so we don't report unmatched students + # - OR no students matched, which in the first pass means we've hit a more fundamental error + # in both cases we usually have already exited out of the run prior to now, but we retain + # this as a guardrail + self.logger.debug("no match rates available. Skipping upload of unmatched students file") artifact.UNMATCHED_STUDENTS.needs_upload = False return - if len(match_rates) > 0: - self.logger.info(f"at least some records matched - match rates by ID: {match_rates}") - - highest_match = sorted(match_rates, reverse=True, key=lambda mr: float(mr['match_rate']))[0] - self.highest_match_rate = float(highest_match["match_rate"]) - self.highest_match_id_name = highest_match["source_column_name"] - self.highest_match_id_type = highest_match["edfi_column_name"] - self.num_unmatched_students = int(highest_match["num_rows"]) - int(highest_match["num_matches"]) + self.logger.info(f"at least some records matched - match rates by ID: {match_rates}") + highest_match = sorted(match_rates, reverse=True, key=lambda mr: float(mr['match_rate']))[0] + self.highest_match_rate = float(highest_match["match_rate"]) + self.highest_match_id_name = highest_match["source_column_name"] + self.highest_match_id_type = highest_match["edfi_column_name"] + self.num_unmatched_students = int(highest_match["num_rows"]) - int(highest_match["num_matches"]) - if self.num_unmatched_students == 0: - # case 2 - self.logger.debug("all records matched. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False - else: - # case 3 - self.logger.debug("no students matched any ID. Skipping upload of unmatched students file") + if self.num_unmatched_students == 0: + self.logger.debug("all records matched. Skipping upload of unmatched students file") artifact.UNMATCHED_STUDENTS.needs_upload = False def enforce_match_threshold(self): - """Halt if the primary Earthmover run's best match rate is below the configured threshold. - - Sub-threshold matches almost always indicate the wrong file was uploaded; in that case - we want to abort before doing any further work (including a cross-year second pass) - and surface a clear error to the user. - """ - if not self.num_unmatched_students: + """Halt if the primary Earthmover run's best match rate is below the configured threshold""" + if self.num_unmatched_students == 0: return if self.highest_match_rate >= config.REQUIRED_ID_MATCH_RATE: return @@ -736,7 +734,7 @@ def enforce_match_threshold(self): # if we don't return the unmatched students file at all self.logger.debug("too many unmatched students. Skipping upload") artifact.UNMATCHED_STUDENTS.needs_upload = False - raise ValueError("insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") + raise ValueError(f"insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") def report_unmatched_students(self): """Alert the app to any unmatched students that remain after all Earthmover passes. @@ -744,7 +742,7 @@ def report_unmatched_students(self): This method is only run if enforce_match_threshold determines that enough students have matched that the input file is basically trustworthy """ - if not self.num_unmatched_students: + if self.num_unmatched_students == 0: return self.logger.warning('earthmover run failed to match some student IDs') @@ -805,10 +803,12 @@ def upload_output(self): self.upload_output_set(set) def upload_output_set(self, output_set): - """Upload all non-empty files in one OutputSet's local_dir to its S3 subdir, then alert the app.""" + """Upload all non-empty JSONL files in one OutputSet's local_dir to its S3 subdir, then alert the app.""" s3_prefix = f"{self.s3_out_path}/{output_set.s3_subdir}" uploaded = False for fname in os.listdir(output_set.local_dir): + if not fname.endswith(".jsonl"): + continue fpath = os.path.join(output_set.local_dir, fname) if not os.path.isfile(fpath) or os.stat(fpath).st_size == 0: continue @@ -939,6 +939,26 @@ def localize_s3_path(path): return path.replace("/", "__") +def load_match_rates(): + """Read the latest Earthmover run's match_rates.csv. Returns an empty list when the file is missing or has no data rows.""" + try: + with open(artifact.MATCH_RATES.path) as f: + return list(csv.DictReader(f, skipinitialspace=True)) + except FileNotFoundError: + return [] + + +def count_unmatched_students(): + """Number of unmatched students reported by the latest Earthmover run's match rates file + + Returns None when the match rates file is empty, signifying no matches or a failed run + """ + rows = load_match_rates() + if len(rows) == 0: + return None + return int(rows[0]["num_rows"]) - int(rows[0]["num_matches"]) + + def stream_to_file(session, url, dest_path, max_attempts=3): """GET url as a stream and write the body to dest_path""" for attempt in range(1, max_attempts + 1): From dea79eafa2f148261f6b26d7e71d9317498abe8d Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 08:22:21 -0500 Subject: [PATCH 14/20] clean up comments --- executor/executor/executor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 71d6f408..2dede32e 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -515,9 +515,7 @@ def earthmover_run(self, results_path): def cross_year_pass(self, primary): """Run a second Earthmover pass on unmatched students using a cross-year roster in an attempt to match more students.""" - # The first pass established which ID column matched best; the second pass is - # constrained to that same column. The count is the fallback for the "zero - # matches" case below. + # constrain this pass to use the IDs that matched best in the first pass first_run_id_name = self.highest_match_id_name first_run_id_type = self.highest_match_id_type @@ -543,7 +541,7 @@ def cross_year_pass(self, primary): # we already know which ID to use so we should succeed no matter how many failed matches remain os.environ["REQUIRED_ID_MATCH_RATE"] = "0.0" self.logger.info( - f"cross-year pass: matching on {first_run_id_name} / {first_run_id_type}" + f"cross-year pass: matching on {first_run_id_name} ({first_run_id_type} ID)" ) self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) From cdbb6f047ed6d4fda8d97c1ac3c73f9c70ffc251 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 09:32:30 -0500 Subject: [PATCH 15/20] fix stacktrace reporting --- executor/executor/executor.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 2dede32e..559715a8 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -128,6 +128,13 @@ def execute(self): # failure cases traceback.print_exc() + # Lock in the error payload immediately so any failure below this point + # still has something to report to the app. + if not self.error: + self.error = error.UnknownError(traceback.format_exc()) + if not self.error.stacktrace: + self.error.stacktrace = traceback.format_exc() + # generic exception catching to be super-defensive while we cleanup and make a best effort to get the error object out the door try: self.upload_remaining_artifacts() @@ -284,7 +291,7 @@ def get_student_roster(self): def get_roster_from_edu(self): """Query EDU via the Runway app and load cross-year roster data into a JSONL file""" - self.logger.info(f"streaming cross-year roster from {self.cross_year_roster_url}") + self.logger.info(f"cross-year pass: streaming cross-year roster") dest_path = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) try: stream_to_file(self.conn, self.cross_year_roster_url, dest_path) @@ -460,7 +467,9 @@ def earthmover_run(self, results_path): fatal = False try: em = subprocess.run( - ["earthmover", "-c", self.wrapper_earthmover, "compile"] + ["earthmover", "-c", self.wrapper_earthmover, "compile"], + capture_output=True, + text=True ) em.check_returncode() @@ -479,7 +488,7 @@ def earthmover_run(self, results_path): self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() except subprocess.CalledProcessError as err: - self.logger.warning("earthmover encountered an error") + self.logger.error("earthmover encountered an error") fatal = True # yes it's brittle to check the error against a string like this, but this message hasn't @@ -540,9 +549,7 @@ def cross_year_pass(self, primary): ) # we already know which ID to use so we should succeed no matter how many failed matches remain os.environ["REQUIRED_ID_MATCH_RATE"] = "0.0" - self.logger.info( - f"cross-year pass: matching on {first_run_id_name} ({first_run_id_type} ID)" - ) + self.logger.info(f"cross-year pass: matching on {first_run_id_name} ({first_run_id_type} ID)") self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) artifact.EM_RESULTS_X_YEAR.needs_upload = True @@ -555,7 +562,10 @@ def cross_year_pass(self, primary): # them how many records failed to match. However, with no matches, our usual method of counting # via the match rates file breaks down, because that file is now empty. So here we are saying # "if there were no matches on the second pass, use the number of unmatched from the first pass" + self.logger.warning("cross-year pass: no additional matches. Falling back to original unmatched students file") count = self.num_unmatched_students + else: + self.logger.warning(f"cross-year pass: {count} unmatched students remain") # then, in either case, we do the usual thing: upload the unmatched students file if and only if # there are unmatched students From 11f11f9388097087e8dddf4e336c961c8c0983d8 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 10:28:06 -0500 Subject: [PATCH 16/20] integrate lightbeam guardrail --- executor/executor/executor.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 559715a8..eaac38b6 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -775,17 +775,33 @@ def lightbeam_send(self): # If we ran Earthmover twice, we're only ever sending the first output set os.environ["DATA_DIR"] = self.output_sets[0].local_dir try: - subprocess.run( - ["lightbeam", "-c", self.assessment_lightbeam, "send", "--set", "state_dir", ".lightbeam", "--results-file", artifact.LB_SEND_RESULTS.path] - ).check_returncode() + lb = subprocess.run( + ["lightbeam", "-c", self.assessment_lightbeam, "send", "--results-file", artifact.LB_SEND_RESULTS.path] + ) + if lb.stdout: + self.logger.info(f"lightbeam stdout: {lb.stdout}") + if lb.stderr: + self.logger.info(f"lightbeam stderr: {lb.stderr}") + lb.check_returncode() - # TODO: ostensibly should check for Ed-Fi warnings here but failed uploads still make it back via the summary report except subprocess.CalledProcessError: - self.error = error.LightbeamSendError() - raise + self.error = error.LightbeamSendError(lb.stderr) self.upload_artifact(artifact.LB_SEND_RESULTS) + # place an additional guardrail around the case when everything fails to send, + # likely due to a descriptor or namespace issue. We don't want to continue forward in that case + with open(artifact.LB_SEND_RESULTS.path) as f: + send_results = json.load(f)["resources"] + + all_failed = all( + counts.get("records_processed", 0) == counts.get("records_failed", 0) + for counts in send_results.values() + ) + if all_failed: + self.error = error.LightbeamSendError() + raise ValueError("all output data failed to send") + def compile_summary(self): """Send per-resource records-processed/skipped/failed counts to the app.""" summary = {} From b839bb69f5a59790a607f21076b9fe1645accc01 Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 13:22:19 -0500 Subject: [PATCH 17/20] integrate lightbeam guardrail --- executor/executor/executor.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index eaac38b6..c1c6015f 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -122,6 +122,7 @@ def execute(self): self.upload_output() self.compile_summary() + self.upload_remaining_artifacts() # NOTE: all specific exceptions are handled in sub-methods except: @@ -287,8 +288,6 @@ def get_student_roster(self): # only running Earthmover once with uploaded roster self.get_roster_from_s3() - self.upload_artifact(artifact.ROSTER) - def get_roster_from_edu(self): """Query EDU via the Runway app and load cross-year roster data into a JSONL file""" self.logger.info(f"cross-year pass: streaming cross-year roster") @@ -428,7 +427,6 @@ def orchestrate_earthmover(self): self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") self.earthmover_run(artifact.EM_RESULTS.path) - self.upload_artifact(artifact.EM_RESULTS) self.record_highest_match_rate() self.enforce_match_threshold() @@ -450,7 +448,6 @@ def orchestrate_earthmover(self): cross_year_output = self.cross_year_pass(self.output_sets[0]) self.output_sets.append(cross_year_output) - self.upload_artifact(artifact.MATCH_RATES) self.report_unmatched_students() def earthmover_run(self, results_path): @@ -553,7 +550,6 @@ def cross_year_pass(self, primary): self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) artifact.EM_RESULTS_X_YEAR.needs_upload = True - self.upload_artifact(artifact.EM_RESULTS_X_YEAR) count = count_unmatched_students() if count is None: @@ -763,7 +759,6 @@ def report_unmatched_students(self): # additional context so the app can help the user fix their file # in this case, num_unmatched_students is guaranteed to be an int instead of None self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) - self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): """Upload an Earthmover output set to the ODS via lightbeam.""" @@ -787,8 +782,6 @@ def lightbeam_send(self): except subprocess.CalledProcessError: self.error = error.LightbeamSendError(lb.stderr) - self.upload_artifact(artifact.LB_SEND_RESULTS) - # place an additional guardrail around the case when everything fails to send, # likely due to a descriptor or namespace issue. We don't want to continue forward in that case with open(artifact.LB_SEND_RESULTS.path) as f: From 2d3e9f7f9ba2642f21f6aa9aa1e4a7ac9321151e Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 13:26:53 -0500 Subject: [PATCH 18/20] more surgical move --- executor/executor/executor.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index c1c6015f..45e03ce8 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -120,9 +120,9 @@ def execute(self): self.lightbeam_send() + self.report_unmatched_students() self.upload_output() self.compile_summary() - self.upload_remaining_artifacts() # NOTE: all specific exceptions are handled in sub-methods except: @@ -288,6 +288,8 @@ def get_student_roster(self): # only running Earthmover once with uploaded roster self.get_roster_from_s3() + self.upload_artifact(artifact.ROSTER) + def get_roster_from_edu(self): """Query EDU via the Runway app and load cross-year roster data into a JSONL file""" self.logger.info(f"cross-year pass: streaming cross-year roster") @@ -427,6 +429,7 @@ def orchestrate_earthmover(self): self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") self.earthmover_run(artifact.EM_RESULTS.path) + self.upload_artifact(artifact.EM_RESULTS) self.record_highest_match_rate() self.enforce_match_threshold() @@ -448,7 +451,7 @@ def orchestrate_earthmover(self): cross_year_output = self.cross_year_pass(self.output_sets[0]) self.output_sets.append(cross_year_output) - self.report_unmatched_students() + self.upload_artifact(artifact.MATCH_RATES) def earthmover_run(self, results_path): """Compile and run Earthmover into the given results directory.""" @@ -550,6 +553,7 @@ def cross_year_pass(self, primary): self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) artifact.EM_RESULTS_X_YEAR.needs_upload = True + self.upload_artifact(artifact.EM_RESULTS_X_YEAR) count = count_unmatched_students() if count is None: @@ -743,8 +747,10 @@ def enforce_match_threshold(self): def report_unmatched_students(self): """Alert the app to any unmatched students that remain after all Earthmover passes. - This method is only run if enforce_match_threshold determines that enough students have matched that the + This method is only run if + - enforce_match_threshold determines that enough students have matched that the input file is basically trustworthy + - lightbeam_send does not fail """ if self.num_unmatched_students == 0: return @@ -759,6 +765,7 @@ def report_unmatched_students(self): # additional context so the app can help the user fix their file # in this case, num_unmatched_students is guaranteed to be an int instead of None self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) + self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): """Upload an Earthmover output set to the ODS via lightbeam.""" @@ -782,6 +789,8 @@ def lightbeam_send(self): except subprocess.CalledProcessError: self.error = error.LightbeamSendError(lb.stderr) + self.upload_artifact(artifact.LB_SEND_RESULTS) + # place an additional guardrail around the case when everything fails to send, # likely due to a descriptor or namespace issue. We don't want to continue forward in that case with open(artifact.LB_SEND_RESULTS.path) as f: From b03dac0e02d5e82dfcae7b3ebd59869fad981b5a Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Wed, 27 May 2026 13:54:04 -0500 Subject: [PATCH 19/20] clarify default upload status --- executor/executor/artifacts.py | 3 +++ executor/executor/executor.py | 25 ++++++++----------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/executor/executor/artifacts.py b/executor/executor/artifacts.py index 6ad2b9aa..01b92e34 100644 --- a/executor/executor/artifacts.py +++ b/executor/executor/artifacts.py @@ -33,9 +33,12 @@ def __init__(self, name, path, needs_upload=True): "student_id_match_rates", os.path.join(config.OUTPUT_DIR, ("student_id_match_rates.csv")), ) +# Uploading the unmatched students file impacts the UX; we must be careful to only do so +# in the event of an otherwise successful run that has a tolerable number of unmatched input records UNMATCHED_STUDENTS = JobArtifact( "unmatched_students", os.path.join(config.OUTPUT_DIR, "input_no_student_id_match.csv"), + False ) LB_SEND_RESULTS = JobArtifact( "lightbeam_send_results", diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 45e03ce8..93971283 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -512,11 +512,9 @@ def earthmover_run(self, results_path): # fails during data transformation. This is most likely to happen when the user # uploads a file that is recognizable as the correct assessment but has some # other flaw - for example, the file is from the wrong year. - # In this case. we end up with a match rates file and unmatched students file, - # but we don't want to send either of them to the app. All the user needs to know - # is that the run failed. + # In this case. we end up with a match rates file, but we don't want to send it + # to the app. All the user needs to know is that the run failed. artifact.MATCH_RATES.needs_upload = False - artifact.UNMATCHED_STUDENTS.needs_upload = False # Anyway, yeah, the run failed. Shut it down. self.error = error.EarthmoverRunError() # generic exception that will be caught, with em.stderr reported as the stacktrace @@ -570,7 +568,6 @@ def cross_year_pass(self, primary): # then, in either case, we do the usual thing: upload the unmatched students file if and only if # there are unmatched students self.num_unmatched_students = count - artifact.UNMATCHED_STUDENTS.needs_upload = count > 0 return OutputSet( local_dir=self.output_dir, @@ -703,6 +700,8 @@ def record_highest_match_rate(self): self.highest_match_rate = 0.0 self.highest_match_id_name = "N/A" self.highest_match_id_type = "N/A" + # we set this to distinguish from the case where the run succeeded and num_unmatched_students is 0. + # If this value is not properly set then enforce_match_threshold fails, as intended self.num_unmatched_students = None match_rates = load_match_rates() @@ -712,8 +711,6 @@ def record_highest_match_rate(self): # - OR no students matched, which in the first pass means we've hit a more fundamental error # in both cases we usually have already exited out of the run prior to now, but we retain # this as a guardrail - self.logger.debug("no match rates available. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False return self.logger.info(f"at least some records matched - match rates by ID: {match_rates}") @@ -724,8 +721,7 @@ def record_highest_match_rate(self): self.num_unmatched_students = int(highest_match["num_rows"]) - int(highest_match["num_matches"]) if self.num_unmatched_students == 0: - self.logger.debug("all records matched. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False + self.logger.debug("all input records matched") def enforce_match_threshold(self): """Halt if the primary Earthmover run's best match rate is below the configured threshold""" @@ -740,17 +736,11 @@ def enforce_match_threshold(self): ) # For now, since we're asking the user to revisit their entire file, it's simpler # if we don't return the unmatched students file at all - self.logger.debug("too many unmatched students. Skipping upload") - artifact.UNMATCHED_STUDENTS.needs_upload = False + self.logger.debug("too many unmatched students. Halting run") raise ValueError(f"insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") def report_unmatched_students(self): - """Alert the app to any unmatched students that remain after all Earthmover passes. - - This method is only run if - - enforce_match_threshold determines that enough students have matched that the - input file is basically trustworthy - - lightbeam_send does not fail + """At the end of a successful run, alert the app to any unmatched students that remain after all Earthmover passes. """ if self.num_unmatched_students == 0: return @@ -765,6 +755,7 @@ def report_unmatched_students(self): # additional context so the app can help the user fix their file # in this case, num_unmatched_students is guaranteed to be an int instead of None self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) + artifact.UNMATCHED_STUDENTS.needs_upload = True self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): From bdee7696d0b31f4b5b0f91849aac0eb8812fda3b Mon Sep 17 00:00:00 2001 From: johncmerfeld Date: Mon, 1 Jun 2026 17:45:47 -0500 Subject: [PATCH 20/20] fix cross-year + no-ODS case --- executor/executor/executor.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 93971283..e92424f3 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -282,7 +282,7 @@ def get_student_roster(self): elif self.cross_year_match_available: # Case 2: not sending to this year's ODS but we have access to EDU; # only running Earthmover once with cross-year roster - self.get_roster_from_edu() + self.get_roster_from_edu(artifact.ROSTER.path) else: # Case 3: not sending to this year's ODS and EDU is unavailable; # only running Earthmover once with uploaded roster @@ -290,10 +290,11 @@ def get_student_roster(self): self.upload_artifact(artifact.ROSTER) - def get_roster_from_edu(self): - """Query EDU via the Runway app and load cross-year roster data into a JSONL file""" + def get_roster_from_edu(self, dest_path): + """Query EDU via the Runway app and stream cross-year roster data into the given JSONL file""" self.logger.info(f"cross-year pass: streaming cross-year roster") - dest_path = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) + dest_path = os.path.abspath(dest_path) + os.makedirs(os.path.dirname(dest_path), exist_ok=True) try: stream_to_file(self.conn, self.cross_year_roster_url, dest_path) except requests.exceptions.RequestException: @@ -536,7 +537,7 @@ def cross_year_pass(self, primary): os.environ["INPUT_FILE"] = unmatched_path self.input_sources["INPUT_FILE"]["path"] = unmatched_path - self.get_roster_from_edu() + self.get_roster_from_edu(config.CROSS_YEAR_ROSTER_PATH) os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) # Constrain to the ID column the first pass matched on. The bundle always appends