diff --git a/executor/executor/artifacts.py b/executor/executor/artifacts.py index 9af4b40..01b92e3 100644 --- a/executor/executor/artifacts.py +++ b/executor/executor/artifacts.py @@ -8,10 +8,10 @@ import executor.config as config class JobArtifact: - def __init__(self, name, path): + def __init__(self, name, path, needs_upload=True): self.name = name self.path = path - self.needs_upload = True + self.needs_upload = needs_upload ROSTER = JobArtifact( @@ -22,17 +22,27 @@ def __init__(self, name, path): "earthmover_results", "em-results.json" ) +# Only generated when earthmover runs for a second time as part of cross-year ID matching. +# If a cross-year match is performed as part of a sideload-only job, EM_RESULTS is used alone +EM_RESULTS_X_YEAR = JobArtifact( + "earthmover_results_x_year", + "em-results-x-year.json", + False +) MATCH_RATES = JobArtifact( "student_id_match_rates", os.path.join(config.OUTPUT_DIR, ("student_id_match_rates.csv")), ) +# Uploading the unmatched students file impacts the UX; we must be careful to only do so +# in the event of an otherwise successful run that has a tolerable number of unmatched input records UNMATCHED_STUDENTS = JobArtifact( "unmatched_students", os.path.join(config.OUTPUT_DIR, "input_no_student_id_match.csv"), + False ) LB_SEND_RESULTS = JobArtifact( "lightbeam_send_results", "lb-send-results.json" ) -ALL = [ROSTER, EM_RESULTS, MATCH_RATES, UNMATCHED_STUDENTS, LB_SEND_RESULTS] \ No newline at end of file +ALL = [ROSTER, EM_RESULTS, EM_RESULTS_X_YEAR, MATCH_RATES, UNMATCHED_STUDENTS, LB_SEND_RESULTS] \ No newline at end of file diff --git a/executor/executor/config.py b/executor/executor/config.py index 0974072..469d905 100644 --- a/executor/executor/config.py +++ b/executor/executor/config.py @@ -1,6 +1,9 @@ BUNDLE_DIR = 'bundles' OUTPUT_DIR = 'output' +# when performing a cross-year-matching second pass: location to move earthmover output set from the first pass +OUTPUT_DIR_FIRST_RUN = 'output-first-run' ROSTER_DOWNLOAD_DIR = 'roster-download-dir' +CROSS_YEAR_ROSTER_PATH = 'cross_year_roster.jsonl' REQUIRED_ID_MATCH_RATE = 0.5 STUDENT_ASSESSMENT_FAIL_THRESHOLD = 0.75 diff --git a/executor/executor/errors.py b/executor/executor/errors.py index 25742ee..ed819ff 100644 --- a/executor/executor/errors.py +++ b/executor/executor/errors.py @@ -80,6 +80,10 @@ class EarthmoverRunError(ExecutorError): def __init__(self, stacktrace=None): super().__init__("earthmover_run", stacktrace) +class CrossYearRosterFetchError(ExecutorError): + def __init__(self, stacktrace=None): + super().__init__("cross_year_roster_fetch", stacktrace) + class InsufficientMatchesError(ExecutorError): def __init__(self, match_rate, match_threshold, id_name, id_type, stacktrace=None): self.match_rate = match_rate diff --git a/executor/executor/executor.py b/executor/executor/executor.py index 7a8dc86..e92424f 100644 --- a/executor/executor/executor.py +++ b/executor/executor/executor.py @@ -21,6 +21,7 @@ import executor.artifacts as artifact import executor.config as config import executor.errors as error +from executor.output_sets import OutputSet handler = logging.StreamHandler() _formatter = logging.Formatter( @@ -59,8 +60,10 @@ def __init__(self): self.local_mode = os.environ.get("DEPLOYMENT_MODE") == "LOCAL" self.conn = requests.Session() - # wipe lightbeam state so that local runs stay idempotent + # wipe state left behind by a prior local run so reruns are idempotent shutil.rmtree(".lightbeam", ignore_errors=True) + shutil.rmtree(config.OUTPUT_DIR_FIRST_RUN, ignore_errors=True) + shutil.rmtree(config.ROSTER_DOWNLOAD_DIR, ignore_errors=True) self.output_dir = os.path.abspath(config.OUTPUT_DIR) os.mkdir(self.output_dir) @@ -113,11 +116,11 @@ def execute(self): self.get_input_files() self.map_descriptors() - self.earthmover_run() + self.orchestrate_earthmover() - if self.send_to_ods: - self.lightbeam_send() + self.lightbeam_send() + self.report_unmatched_students() self.upload_output() self.compile_summary() @@ -168,6 +171,9 @@ def unpack_job(self, job): self.output_files_url = job["appUrls"]["outputFiles"] self.send_to_ods = job.get("sendToOds", True) + self.cross_year_match_available = job.get("crossYearMatchAvailable", False) + if self.cross_year_match_available: + self.cross_year_roster_url = job["appUrls"]["roster"] self.assessment_project = os.path.join( self.wrapper_project, "packages", *job["bundle"]["path"].split("/")[1:] @@ -182,8 +188,9 @@ def unpack_job(self, job): if not self.send_to_ods: self.logger.info("this job is not sending Earthmover output set to an ODS") artifact.LB_SEND_RESULTS.needs_upload = False - # if bypassing the ODS, a roster file is required, for now - self.roster_file_path = job["rosterFilePath"] + if not self.cross_year_match_available: + # If bypassing the ODS and not pulling from EDU, require that a roster file exists in S3 + self.roster_file_path = job["rosterFilePath"] else: # Note that we always need the API_YEAR env var set in order to run Earthmover. # We also use it in cases when we are using an ODS - in such cases the year used by EM @@ -265,16 +272,39 @@ def modify_local_lightbeam(self): ) def get_student_roster(self): - """Download a list of students so they can be used to match IDs""" + """Download a list of students so they can be used to match IDs in the initial Earthmover run""" self.set_action(action.GET_ROSTER) if self.send_to_ods: + # Case 1: initially attempt to match on current year; + # leave open the cross-year option if we take another pass self.get_roster_from_ods() + elif self.cross_year_match_available: + # Case 2: not sending to this year's ODS but we have access to EDU; + # only running Earthmover once with cross-year roster + self.get_roster_from_edu(artifact.ROSTER.path) else: + # Case 3: not sending to this year's ODS and EDU is unavailable; + # only running Earthmover once with uploaded roster self.get_roster_from_s3() self.upload_artifact(artifact.ROSTER) + def get_roster_from_edu(self, dest_path): + """Query EDU via the Runway app and stream cross-year roster data into the given JSONL file""" + self.logger.info(f"cross-year pass: streaming cross-year roster") + dest_path = os.path.abspath(dest_path) + os.makedirs(os.path.dirname(dest_path), exist_ok=True) + try: + stream_to_file(self.conn, self.cross_year_roster_url, dest_path) + except requests.exceptions.RequestException: + self.error = error.CrossYearRosterFetchError() + raise + + if os.stat(dest_path).st_size == 0: + self.error = error.CrossYearRosterFetchError() + raise ValueError("Cross-year roster is empty") + def get_roster_from_ods(self): """Fetch student roster from the ODS via lightbeam""" try: @@ -387,33 +417,66 @@ def map_descriptors(self): self.logger.debug(f"Mapped {len(descriptors)} {name} values") - def earthmover_run(self): - """Run an Earthmover bundle to transform assessment data""" + def orchestrate_earthmover(self): + """Run an Earthmover bundle to transform assessment data. + + Conditionally run Earthmover a second time on input records that initially failed to match + """ self.set_action(action.EARTHMOVER_RUN) + # first pass self.unpack_id_types() os.environ["EDFI_STUDENT_ID_TYPES"] = ",".join(self.distinct_id_types) self.logger.info(f"Student ID types in Ed-Fi roster: {os.environ['EDFI_STUDENT_ID_TYPES']}") + self.earthmover_run(artifact.EM_RESULTS.path) + self.upload_artifact(artifact.EM_RESULTS) + self.record_highest_match_rate() + self.enforce_match_threshold() + + self.output_sets = [OutputSet( + local_dir=self.output_dir, + s3_subdir="ods" if self.send_to_ods else "non-ods", + sent_to_ods=self.send_to_ods, + em_results_path=artifact.EM_RESULTS.path, + lb_send_results_path=artifact.LB_SEND_RESULTS.path if self.send_to_ods else None, + )] + + # if we're here, the first pass of Earthmover was successful + if (self.send_to_ods # i.e. we've only tried matching this year's students so far + and self.cross_year_match_available # and we have access to EDU + and self.num_unmatched_students > 0 # and there are unmatched students from the first pass + ): + # then take a second pass with the cross-year roster from EDU + # and thus produce a second output set to be sideloaded + cross_year_output = self.cross_year_pass(self.output_sets[0]) + self.output_sets.append(cross_year_output) + + self.upload_artifact(artifact.MATCH_RATES) + + def earthmover_run(self, results_path): + """Compile and run Earthmover into the given results directory.""" self.check_input_encoding() - fatal = False + if self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: + encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) + else: + encoding = None + encoding_args = ["--set", "sources.input.encoding", encoding] if encoding else [] + if encoding: + self.logger.info(f"using input encoding: {encoding}") + fatal = False try: - encoding_mod = [] - if self.input_sources["INPUT_FILE"]["is_plausible_non_utf8"]: - # if chardet identified an encoding that we think is plausible, use it. - # Otherwise, do nothing and default to UTF-8 for the first attempt - encoding = str.lower(self.input_sources["INPUT_FILE"]["encoding"]) - self.logger.info(f"setting input encoding to {encoding}") - encoding_mod.extend(["--set", "sources.input.encoding", encoding]) - - subprocess.run( - ["earthmover", "-c", self.wrapper_earthmover, "compile"] - ).check_returncode() + em = subprocess.run( + ["earthmover", "-c", self.wrapper_earthmover, "compile"], + capture_output=True, + text=True + ) + em.check_returncode() # attempt no. 1 - cmd = ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", artifact.EM_RESULTS.path] - cmd.extend(encoding_mod) + cmd = ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", results_path] + cmd.extend(encoding_args) em = subprocess.run( cmd, capture_output=True, @@ -426,17 +489,17 @@ def earthmover_run(self): self.logger.info(f"earthmover stderr: {em.stderr}") em.check_returncode() except subprocess.CalledProcessError as err: - self.logger.warning("earthmover encountered an error") + self.logger.error("earthmover encountered an error") fatal = True # yes it's brittle to check the error against a string like this, but this message hasn't # changed since 2007(!) -> https://github.com/python/cpython/blame/main/Objects/exceptions.c - if err.stderr and "codec can't decode" in err.stderr and self.input_sources["INPUT_FILE"]["encoding"] != "ISO-8859-1": - self.logger.error(f"Failed to read file with {self.input_sources['INPUT_FILE']['encoding']} encoding. Retrying with Latin1...") + if err.stderr and "codec can't decode" in err.stderr and encoding != "iso-8859-1": + self.logger.error(f"Failed to read file with {encoding} encoding. Retrying with Latin1...") try: # attempt no. 2 - need a new em object to overwrite the decoding error em = subprocess.run( - ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", artifact.EM_RESULTS.path, "--set", "sources.input.encoding", "iso-8859-1"], + ["earthmover", "-c", self.wrapper_earthmover, "run", "--results-file", results_path, "--set", "sources.input.encoding", "iso-8859-1"], ) em.check_returncode() @@ -446,26 +509,73 @@ def earthmover_run(self): pass if fatal: - # It is possible that Earthmover successfully matches students but then + # It is possible that Earthmover successfully matches students but then # fails during data transformation. This is most likely to happen when the user # uploads a file that is recognizable as the correct assessment but has some # other flaw - for example, the file is from the wrong year. - # In this case. we end up with a match rates file and unmatched students file, - # but we don't want to send either of them to the app. All the user needs to know - # is that the run failed. + # In this case. we end up with a match rates file, but we don't want to send it + # to the app. All the user needs to know is that the run failed. artifact.MATCH_RATES.needs_upload = False - artifact.UNMATCHED_STUDENTS.needs_upload = False # Anyway, yeah, the run failed. Shut it down. self.error = error.EarthmoverRunError() # generic exception that will be caught, with em.stderr reported as the stacktrace raise Exception(em.stderr) - self.record_highest_match_rate() - self.upload_artifact(artifact.EM_RESULTS) - self.upload_artifact(artifact.MATCH_RATES) + def cross_year_pass(self, primary): + """Run a second Earthmover pass on unmatched students using a cross-year roster in an attempt to match more students.""" + # constrain this pass to use the IDs that matched best in the first pass + first_run_id_name = self.highest_match_id_name + first_run_id_type = self.highest_match_id_type - # If we reach this point, it's likely that the input file was basically compatible with the bundle - self.report_unmatched_students() + first_run_output_dir = os.path.abspath(config.OUTPUT_DIR_FIRST_RUN) + os.rename(self.output_dir, first_run_output_dir) + primary.local_dir = first_run_output_dir + os.mkdir(self.output_dir) + + # use only the students who failed to match the primary ID from the first run + unmatched_path = os.path.join(first_run_output_dir, os.path.basename(artifact.UNMATCHED_STUDENTS.path)) + os.environ["INPUT_FILE"] = unmatched_path + self.input_sources["INPUT_FILE"]["path"] = unmatched_path + + self.get_roster_from_edu(config.CROSS_YEAR_ROSTER_PATH) + os.environ["EDFI_ROSTER_FILE"] = os.path.abspath(config.CROSS_YEAR_ROSTER_PATH) + + # Constrain to the ID column the first pass matched on. The bundle always appends + # studentUniqueId internally, so we pass an empty list when that's what won. + os.environ["POSSIBLE_STUDENT_ID_COLUMNS"] = first_run_id_name + os.environ["EDFI_STUDENT_ID_TYPES"] = ( + "" if first_run_id_type == "studentUniqueId" else first_run_id_type + ) + # we already know which ID to use so we should succeed no matter how many failed matches remain + os.environ["REQUIRED_ID_MATCH_RATE"] = "0.0" + self.logger.info(f"cross-year pass: matching on {first_run_id_name} ({first_run_id_type} ID)") + + self.earthmover_run(artifact.EM_RESULTS_X_YEAR.path) + artifact.EM_RESULTS_X_YEAR.needs_upload = True + self.upload_artifact(artifact.EM_RESULTS_X_YEAR) + + count = count_unmatched_students() + if count is None: + # Edge case alert! It may be that in the second pass, there are no matches even with the + # expanded universe of students. Of course we need to return these rows to the user and tell + # them how many records failed to match. However, with no matches, our usual method of counting + # via the match rates file breaks down, because that file is now empty. So here we are saying + # "if there were no matches on the second pass, use the number of unmatched from the first pass" + self.logger.warning("cross-year pass: no additional matches. Falling back to original unmatched students file") + count = self.num_unmatched_students + else: + self.logger.warning(f"cross-year pass: {count} unmatched students remain") + + # then, in either case, we do the usual thing: upload the unmatched students file if and only if + # there are unmatched students + self.num_unmatched_students = count + + return OutputSet( + local_dir=self.output_dir, + s3_subdir="non-ods", + sent_to_ods=False, + em_results_path=artifact.EM_RESULTS_X_YEAR.path, + ) def check_input_encoding(self): """Determine whether assessment file should be loaded with a non-UTF-8 encoding""" @@ -583,82 +693,81 @@ def record_highest_match_rate(self): # local ID, then the 4/10 without matching state IDs go in the unmatched students file, but both types # of match are recorded in the match rates file. Both are used for different purposes by the app and # within the exeuctor - # - # There are three circumstances under which we don't upload the unmatched students file and thus don't - # show the "some students failed to match" message to the user. These are: - # 1. Earthmover failed, so we don't know how many students matched - # 2. The match rates file tells us that there is a perfect match, so there is nothing to upload - # 3. The match rates file is empty, so there were no matches and the unmatched students file is the same as the original input + # Why do we use the match rates file to count unmatched students? Because the unmatched students file + # may have multiple header rows, so it's hard to use it for counting: + # ref: https://github.com/edanalytics/runway/pull/6 # in the case of literally zero matches (or a failed run), the file is empty and these defaults remain self.highest_match_rate = 0.0 self.highest_match_id_name = "N/A" self.highest_match_id_type = "N/A" - # but if the file is empty, we don't learn this number. We need to distinguish between 0 and "don't know" + # we set this to distinguish from the case where the run succeeded and num_unmatched_students is 0. + # If this value is not properly set then enforce_match_threshold fails, as intended self.num_unmatched_students = None - try: - with open(artifact.MATCH_RATES.path) as f: - match_rates = [ - {k: v for k, v in row.items()} - for row in csv.DictReader(f, skipinitialspace=True) - ] - except FileNotFoundError: - # case 1 - self.logger.debug("failed Earthmover run. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False + match_rates = load_match_rates() + if len(match_rates) == 0: + # either + # - Earthmover failed and so we don't report unmatched students + # - OR no students matched, which in the first pass means we've hit a more fundamental error + # in both cases we usually have already exited out of the run prior to now, but we retain + # this as a guardrail return - if len(match_rates) > 0: - self.logger.info(f"at least some records matched - match rates by ID: {match_rates}") + self.logger.info(f"at least some records matched - match rates by ID: {match_rates}") + highest_match = sorted(match_rates, reverse=True, key=lambda mr: float(mr['match_rate']))[0] + self.highest_match_rate = float(highest_match["match_rate"]) + self.highest_match_id_name = highest_match["source_column_name"] + self.highest_match_id_type = highest_match["edfi_column_name"] + self.num_unmatched_students = int(highest_match["num_rows"]) - int(highest_match["num_matches"]) - highest_match = sorted(match_rates, reverse=True, key=lambda mr: float(mr['match_rate']))[0] - self.highest_match_rate = float(highest_match["match_rate"]) - self.highest_match_id_name = highest_match["source_column_name"] - self.highest_match_id_type = highest_match["edfi_column_name"] - self.num_unmatched_students = int(highest_match["num_rows"]) - int(highest_match["num_matches"]) + if self.num_unmatched_students == 0: + self.logger.debug("all input records matched") - if self.num_unmatched_students == 0: - # case 2 - self.logger.debug("all records matched. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False - else: - # case 3 - self.logger.debug("no students matched any ID. Skipping upload of unmatched students file") - artifact.UNMATCHED_STUDENTS.needs_upload = False + def enforce_match_threshold(self): + """Halt if the primary Earthmover run's best match rate is below the configured threshold""" + if self.num_unmatched_students == 0: + return + if self.highest_match_rate >= config.REQUIRED_ID_MATCH_RATE: + return + + self.error = error.InsufficientMatchesError( + self.highest_match_rate, config.REQUIRED_ID_MATCH_RATE, + self.highest_match_id_name, self.highest_match_id_type, + ) + # For now, since we're asking the user to revisit their entire file, it's simpler + # if we don't return the unmatched students file at all + self.logger.debug("too many unmatched students. Halting run") + raise ValueError(f"insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") def report_unmatched_students(self): - """Alert the app to the existence of unmatched students (if any) and the best candidate for ID matching""" + """At the end of a successful run, alert the app to any unmatched students that remain after all Earthmover passes. + """ if self.num_unmatched_students == 0: return self.logger.warning('earthmover run failed to match some student IDs') - if self.highest_match_rate >= config.REQUIRED_ID_MATCH_RATE: - # in this case, there are unmatched students but not so many that we doubt the - # integrity of the file. Send the ones that match and give the rest back to the user - # if an "actual" ID is replicated as studentUniqueId, we should send the actual ID to the user - id_type_to_report = self.highest_match_id_type - if self.highest_match_id_type == "studentUniqueId" and self.stu_unique_id_in_roster: - id_type_to_report = self.stu_unique_id_in_roster - - # additional context so the app can help the user fix their file - # in this case, num_unmatched_students is guaranteed to be an int instead of None - self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) - self.upload_artifact(artifact.UNMATCHED_STUDENTS) - else: - # Insufficient matches. Assume the input file is no good Don't bother uploading anything. - # Instead, alert the user with an error - self.error = error.InsufficientMatchesError(self.highest_match_rate, config.REQUIRED_ID_MATCH_RATE, self.highest_match_id_name, self.highest_match_id_type) - # For now, since we're asking the user to revisit their entire file, it's simpler if we don't - # return the unmatched students file at all - self.logger.debug("too many unmatched students. Skipping upload") - artifact.UNMATCHED_STUDENTS.needs_upload = False - raise ValueError(f"insufficient ID matches to continue (highest rate {self.highest_match_rate} < required {config.REQUIRED_ID_MATCH_RATE}; ID column name: {self.highest_match_id_name}; Ed-Fi ID type: {self.highest_match_id_type})") + # if an "actual" ID is replicated as studentUniqueId, we should send the actual ID to the user + id_type_to_report = self.highest_match_id_type + if self.highest_match_id_type == "studentUniqueId" and self.stu_unique_id_in_roster: + id_type_to_report = self.stu_unique_id_in_roster + + # additional context so the app can help the user fix their file + # in this case, num_unmatched_students is guaranteed to be an int instead of None + self.send_id_matches(self.highest_match_id_name, id_type_to_report, self.num_unmatched_students) + artifact.UNMATCHED_STUDENTS.needs_upload = True + self.upload_artifact(artifact.UNMATCHED_STUDENTS) def lightbeam_send(self): - """Upload Earthmover's outputs to the ODS""" + """Upload an Earthmover output set to the ODS via lightbeam.""" + + if not self.send_to_ods: + return + self.set_action(action.LIGHTBEAM_SEND) + # If we ran Earthmover twice, we're only ever sending the first output set + os.environ["DATA_DIR"] = self.output_sets[0].local_dir try: lb = subprocess.run( ["lightbeam", "-c", self.assessment_lightbeam, "send", "--results-file", artifact.LB_SEND_RESULTS.path] @@ -688,50 +797,41 @@ def lightbeam_send(self): raise ValueError("all output data failed to send") def compile_summary(self): - """Post results from job to share with the user""" - if self.send_to_ods: - lb_send_results = {} - with open(artifact.LB_SEND_RESULTS.path) as f: - lb_send_results = json.load(f) - - required_counts = ["records_processed", "records_skipped", "records_failed"] - for resource, counts in lb_send_results["resources"].items(): - # grab the counts we care about - self.summary[resource] = { - key: val for key, val in counts.items() if key in required_counts - } - else: - # no lightbeam send, use Earthmover stats for now - with open(artifact.EM_RESULTS.path) as f: - em_results = json.load(f) - - dest_prefix = "$destinations." - for key, count in em_results.get("row_counts", {}).items(): - if key.startswith(dest_prefix): - resource = key[len(dest_prefix):] - self.summary[resource] = { - "records_processed": count, + """Send per-resource records-processed/skipped/failed counts to the app.""" + summary = {} + for set in self.output_sets: + for resource, counts in set.counts().items(): + if resource not in summary: + summary[resource] = { + "records_processed": 0, "records_skipped": 0, "records_failed": 0, } + for k in summary[resource]: + summary[resource][k] += counts.get(k, 0) - if self.summary: + if summary: + self.summary = summary self.send_job_summary() def upload_output(self): - """Upload Earthmover output set to S3 and notify the app of their location""" + """Upload each Earthmover output set to S3 and notify the app of its location.""" self.set_action(action.UPLOAD_OUTPUT) + for set in self.output_sets: + self.upload_output_set(set) - output_type = "ods" if self.send_to_ods else "non-ods" - s3_subdir = f"{self.s3_out_path}/{output_type}" - + def upload_output_set(self, output_set): + """Upload all non-empty JSONL files in one OutputSet's local_dir to its S3 subdir, then alert the app.""" + s3_prefix = f"{self.s3_out_path}/{output_set.s3_subdir}" uploaded = False - for fname in os.listdir(self.output_dir): - fpath = os.path.join(self.output_dir, fname) + for fname in os.listdir(output_set.local_dir): + if not fname.endswith(".jsonl"): + continue + fpath = os.path.join(output_set.local_dir, fname) if not os.path.isfile(fpath) or os.stat(fpath).st_size == 0: continue - dest_fname = f"{s3_subdir}/{fname}" + dest_fname = f"{s3_prefix}/{fname}" self.logger.info(f"uploading output: {fname} -> {dest_fname}") try: self.s3.upload_file(fpath, self.app_bucket, dest_fname) @@ -741,10 +841,10 @@ def upload_output(self): uploaded = True if not uploaded: - self.logger.warning("no earthmover output files found to upload") + self.logger.warning(f"no Earthmover output files found in {output_set.local_dir} to upload") return - self.send_job_output_alert(s3_subdir) + self.send_job_output_alert(s3_prefix, output_set.sent_to_ods) def upload_remaining_artifacts(self): """Attempt to upload all artifacts that have not yet been uploaded""" @@ -843,11 +943,11 @@ def send_job_summary(self): self.logger.debug(f"Sending summary") self.conn.post(self.summary_url, json=self.summary) - def send_job_output_alert(self, s3_prefix): + def send_job_output_alert(self, s3_prefix, sent_to_ods): """Notify the app that an Earthmover output set has been uploaded to S3""" self.logger.debug(f"Notifying app of output set at {s3_prefix}") self.conn.post(self.output_files_url, json={ - "sentToOds": self.send_to_ods, + "sentToOds": sent_to_ods, "path": s3_prefix, }) @@ -855,3 +955,43 @@ def send_job_output_alert(self, s3_prefix): def localize_s3_path(path): """Convert an S3 'path' to a single filename""" return path.replace("/", "__") + + +def load_match_rates(): + """Read the latest Earthmover run's match_rates.csv. Returns an empty list when the file is missing or has no data rows.""" + try: + with open(artifact.MATCH_RATES.path) as f: + return list(csv.DictReader(f, skipinitialspace=True)) + except FileNotFoundError: + return [] + + +def count_unmatched_students(): + """Number of unmatched students reported by the latest Earthmover run's match rates file + + Returns None when the match rates file is empty, signifying no matches or a failed run + """ + rows = load_match_rates() + if len(rows) == 0: + return None + return int(rows[0]["num_rows"]) - int(rows[0]["num_matches"]) + + +def stream_to_file(session, url, dest_path, max_attempts=3): + """GET url as a stream and write the body to dest_path""" + for attempt in range(1, max_attempts + 1): + try: + with session.get(url, stream=True) as resp: + resp.raise_for_status() + with open(dest_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=64 * 1024): + if chunk: + f.write(chunk) + return + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as e: + if attempt == max_attempts: + raise + sleep_seconds = attempt * 5 + print(f"stream_to_file: attempt {attempt}/{max_attempts} failed ({type(e).__name__}); retrying in {sleep_seconds}s") + time.sleep(sleep_seconds) diff --git a/executor/executor/output_sets.py b/executor/executor/output_sets.py new file mode 100644 index 0000000..151d310 --- /dev/null +++ b/executor/executor/output_sets.py @@ -0,0 +1,41 @@ +# An OutputSet describes one collection of Earthmover output files that a job +# produces and uploads to S3. Every job produces at least one output set, which +# may or may not be sent to the ODS. Jobs that perform cross-year matching produce +# two output sets. + +import json + + +class OutputSet: + def __init__(self, local_dir, s3_subdir, sent_to_ods, em_results_path, lb_send_results_path=None): + # Local directory containing the output files Earthmover produced for this set. + self.local_dir = local_dir + # Subdirectory under the job's S3 output path that these files land in. + # The app reads this to distinguish ODS-bound output from sideloaded output. + self.s3_subdir = s3_subdir + # Whether the records in this set were (or will be) sent to an ODS via lightbeam. + self.sent_to_ods = sent_to_ods + # Path to the Earthmover results JSON for the run that produced this set. + self.em_results_path = em_results_path + # Path to the lightbeam-send results JSON, if sent_to_ods + self.lb_send_results_path = lb_send_results_path + + def counts(self): + """Per-resource records-processed/skipped/failed counts contributed by this set. + + For ODS-bound sets, lightbeam's send-results are authoritative (they know + what the ODS actually accepted). For sideloaded sets, Earthmover row counts + are the best signal we have. + """ + if self.sent_to_ods: + with open(self.lb_send_results_path) as f: + return json.load(f)["resources"] + + with open(self.em_results_path) as f: + em_results = json.load(f) + dest_prefix = "$destinations." + return { + key[len(dest_prefix):]: {"records_processed": count} + for key, count in em_results.get("row_counts", {}).items() + if key.startswith(dest_prefix) + }