feat: ft_launcher integration of log analysis for restart decisions#293
feat: ft_launcher integration of log analysis for restart decisions#293namitdhameja wants to merge 6 commits into
Conversation
Greptile SummaryThis PR integrates log-analysis attribution (MCP subprocess via LogSage and HTTP URL backends) into the fault-tolerance launcher's restart decision, replacing the former single
Confidence Score: 3/5Multiple unresolved P1/security issues from prior review rounds plus a new P1 credential-leak in to_yaml_file make this unsafe to merge without fixes. Score of 3 reflects one new confirmed P1/security finding (to_yaml_file credential exposure), plus several unresolved prior-review P1 and P0 issues affecting correctness and security. src/nvidia_resiliency_ext/fault_tolerance/config.py (to_yaml_file credential leak), src/nvidia_resiliency_ext/fault_tolerance/launcher.py (timeout default, min-healthy bypass, credential in rdzv_configs log)
|
| Filename | Overview |
|---|---|
| src/nvidia_resiliency_ext/fault_tolerance/ft_attribution.py | New file: multi-backend attribution client for MCP and HTTP URL backends. |
| src/nvidia_resiliency_ext/fault_tolerance/config.py | Adds SlackConfig dataclass; to_yaml_file exposes auth field via dataclasses.asdict (P1/security). |
| src/nvidia_resiliency_ext/fault_tolerance/launcher.py | Major rework; unresolved issues: timeout default, min-healthy bypass, credential logging. |
| src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py | Replaces AttributionService with LogAnalysisClient; adds rollback and timing support. |
| src/nvidia_resiliency_ext/attribution/log_analyzer/runner.py | Adds notify_log_path_sync and extends run_log_analysis_sync with new kwargs. |
| src/nvidia_resiliency_ext/attribution/api_keys.py | Renames load_nvidia_api_key to load_llm_api_key and updates env var names. |
| tests/attribution/unit/test_api_keys.py | Tests updated for renamed function and new LLM_API_KEY_FILE file-read case. |
Sequence Diagram
sequenceDiagram
participant L as Launcher (store host)
participant P as Healthy Peers
participant A as Attribution Backends (MCP / HTTP)
participant R as RendezvousBarrier
L->>R: _increment_peer_aborted_count()
P-->>R: observe peer_aborted_increased=True
P->>P: _handle_restart_decision (notify_peer=False)
L->>A: should_stop(cycle_log_file) [parallel fetch]
alt Attribution says STOP
A-->>L: True
L->>R: _undo_peer_abort_notify() [best-effort]
L->>R: set_permanently_closed()
L->>L: _graceful_stop_requested=True
P->>R: next_rendezvous() → RendezvousClosedError
else Attribution says RESTART
A-->>L: False
L->>R: _open_rendezvous_for_restart()
L->>L: _restart_workers()
P->>R: next_rendezvous() → joins new round
end
Comments Outside Diff (1)
-
src/nvidia_resiliency_ext/fault_tolerance/config.py, line 447-451 (link)to_yaml_filewritesSlackConfigauth field in cleartextdataclasses.asdict(self)recursively serializes all nested dataclasses verbatim — includingSlackConfig. If the Slack auth value was loaded from a path file byFaultToleranceConfig.from_args, the resolved string appears in the saved YAML in cleartext, turning a secure file-reference into an inline credential on disk.The fix is to replace the
slackentry in the intermediate dict with the result ofSlackConfig.to_dict()using the redacted mode (presence flag, not raw value) before callingyaml.dump.
Reviews (12): Last reviewed commit: "graceful_stop_requested" | Re-trigger Greptile
| state_in_rdzv = rdzv_handler.try_set_worker_state(state) | ||
| if state_in_rdzv != state: | ||
| assert ( | ||
| state_in_rdzv == WorkerState.UNKNOWN | ||
| ), f"Could not set worker group state {state=} {state_in_rdzv=}" | ||
| # state in the rdzv is UNKNOWN if this node was marked as dead by other participants | ||
| else: | ||
| raise RuntimeError(f"[{role}] Worker group in unexpected state: {state.name}") | ||
|
|
||
| # count the number of worker groups in each state | ||
| all_worker_states = rdzv_handler.get_worker_states() |
There was a problem hiding this comment.
min-healthy crashes with barrier rendezvous
_invoke_run_with_min_healthy_policy calls rdzv_handler.try_set_worker_state (line 783) and rdzv_handler.get_worker_states (line 793), but FtRendezvousBarrierHandler implements neither — they exist only on FtRendezvousHandler (legacy). Any user who sets --ft-restart-policy min-healthy --ft-rdzv-impl barrier will get an AttributeError at the first worker failure. There is no validation or early error to prevent or warn about this combination.
| server_log_fd = open(grpc_server_log, 'w') | ||
|
|
||
| # Start gRPC server as subprocess with stdout/stderr redirected | ||
| # Note: No --output-file argument - clients specify file_path in each chunk | ||
| server_process = subprocess.Popen( | ||
| [ | ||
| sys.executable, '-m', | ||
| 'nvidia_resiliency_ext.shared_utils.grpc_log_server', | ||
| '--host', '0.0.0.0', | ||
| '--port', str(grpc_port), | ||
| '--max-workers', str(max_workers), | ||
| '--graceful-shutdown-timeout', str(graceful_shutdown_timeout), | ||
| ], | ||
| stdout=server_log_fd, | ||
| stderr=subprocess.STDOUT, # Redirect stderr to stdout (both go to same file) | ||
| ) | ||
|
|
||
| # Root must outlive leaves: leaves may wait the full leaf grace window for downstream | ||
| # clients before draining their queues to root; root's own grace must still be open then. | ||
| root_graceful_shutdown_timeout = graceful_shutdown_timeout * 2.0 | ||
|
|
||
| root_fd = _open_log(root_log) | ||
| try: | ||
| root_p = subprocess.Popen( | ||
| [ | ||
| sys.executable, | ||
| '-m', | ||
| 'nvidia_resiliency_ext.shared_utils.grpc_log_server', | ||
| '--host', | ||
| '0.0.0.0', | ||
| '--port', | ||
| str(root_port), | ||
| '--max-workers', | ||
| str(root_workers), | ||
| '--graceful-shutdown-timeout', | ||
| str(root_graceful_shutdown_timeout), | ||
| ], | ||
| stdout=root_fd, | ||
| stderr=subprocess.STDOUT, | ||
| ) | ||
| finally: | ||
| root_fd.close() | ||
| procs.append(root_p) | ||
| logger.info( | ||
| f"gRPC root log server: PID={root_p.pid}, port={root_port}, max_workers={root_workers}, " | ||
| f"graceful_shutdown_timeout_s={root_graceful_shutdown_timeout} (2× leaf {graceful_shutdown_timeout}s)" | ||
| f"gRPC log server started: PID={server_process.pid}, port={grpc_port}, " | ||
| f"max_workers={max_workers} (for {max_nodes} nodes)" | ||
| ) | ||
|
|
||
| upstream = f'localhost:{root_port}' | ||
| for i in range(n): | ||
| leaf_port = funnel_ports.leaf_listen_port(i) | ||
| leaf_log = _grpc_log_path(base_log_file, f'_grpc_leaf_{i}.log') | ||
| lf = _open_log(leaf_log) | ||
| try: | ||
| leaf_cmd = [ | ||
| sys.executable, | ||
| '-m', | ||
| 'nvidia_resiliency_ext.shared_utils.grpc_log_leaf_server', | ||
| '--host', | ||
| '0.0.0.0', | ||
| '--port', | ||
| str(leaf_port), | ||
| '--upstream', | ||
| upstream, | ||
| '--max-workers', | ||
| str(per_leaf), | ||
| '--max-queue-chunks', | ||
| str(max_queue), | ||
| '--graceful-shutdown-timeout', | ||
| str(graceful_shutdown_timeout), | ||
| ] | ||
| lp = subprocess.Popen( | ||
| leaf_cmd, | ||
| stdout=lf, | ||
| stderr=subprocess.STDOUT, | ||
| ) | ||
| finally: | ||
| lf.close() | ||
| procs.append(lp) | ||
| logger.info( | ||
| f"gRPC leaf log server {i}: PID={lp.pid}, port={leaf_port}, " | ||
| f"max_workers={per_leaf}, max_queue_chunks={max_queue}, upstream={upstream}" | ||
| ) | ||
| # Note: server_log_fd will be closed when server_process terminates | ||
| # Clients will wait for server readiness using their own health check retry logic | ||
| return server_process |
There was a problem hiding this comment.
server_log_fd never closed in the parent process
server_log_fd is opened at line 3543 and passed to Popen, but the parent-side Python file object is never explicitly closed. The comment on line 3565 ("will be closed when server_process terminates") is incorrect — the subprocess only inherits a copy of the OS fd; the parent's Python object stays open until GC. The previous code used a try/finally to close it immediately after Popen:
server_log_fd = open(grpc_server_log, 'w')
try:
server_process = subprocess.Popen(
[...],
stdout=server_log_fd,
stderr=subprocess.STDOUT,
)
finally:
server_log_fd.close()
return server_process| v = val.strip().lower() | ||
| if v == "lib": | ||
| return cls( | ||
| mode="lib", | ||
| timeout_seconds=timeout_seconds, | ||
| slack=slack, | ||
| dataflow_index=dataflow_index, | ||
| ) | ||
| if v == "mcp": | ||
| return cls( | ||
| mode="mcp", | ||
| timeout_seconds=timeout_seconds, | ||
| slack=slack, | ||
| dataflow_index=dataflow_index, | ||
| ) | ||
| url = _validate_attribution_url(v) |
There was a problem hiding this comment.
URL path silently lowercased before storage
v = val.strip().lower() and then _validate_attribution_url(v) — the full input including any URL path is lowercased. A URL like http://host:8000/My-API is silently stored as http://host:8000/my-api. The .lower() is only needed for the "lib"/"mcp" mode comparisons; original case should be preserved for the URL:
| v = val.strip().lower() | |
| if v == "lib": | |
| return cls( | |
| mode="lib", | |
| timeout_seconds=timeout_seconds, | |
| slack=slack, | |
| dataflow_index=dataflow_index, | |
| ) | |
| if v == "mcp": | |
| return cls( | |
| mode="mcp", | |
| timeout_seconds=timeout_seconds, | |
| slack=slack, | |
| dataflow_index=dataflow_index, | |
| ) | |
| url = _validate_attribution_url(v) | |
| v = val.strip().lower() | |
| if v == "lib": | |
| return cls( | |
| mode="lib", | |
| timeout_seconds=timeout_seconds, | |
| slack=slack, | |
| dataflow_index=dataflow_index, | |
| ) | |
| if v == "mcp": | |
| return cls( | |
| mode="mcp", | |
| timeout_seconds=timeout_seconds, | |
| slack=slack, | |
| dataflow_index=dataflow_index, | |
| ) | |
| url = _validate_attribution_url(val.strip()) |
0ba228b to
d785e5e
Compare
|
Tip: Greploop — Automatically fix all review issues by running Use the Greptile plugin for Claude Code to query reviews, search comments, and manage custom context directly from your terminal. |
d785e5e to
d2a9e36
Compare
| parser.add_argument( | ||
| "--ft-attrsvc-port", | ||
| "--ft_attrsvc_port", | ||
| "--ft-attribution-timeout", | ||
| "--ft_attribution_timeout", | ||
| type=int, | ||
| default=60, | ||
| dest="ft_attribution_timeout_seconds", | ||
| help="Attribution wait/timeout in seconds; skip result if exceeded (default: 60).", |
There was a problem hiding this comment.
default=60 silently overrides YAML attribution_timeout_seconds
--ft-attribution-timeout uses default=60, so getattr(args, "ft_attribution_timeout_seconds", None) always returns 60 (never None). In FaultToleranceConfig.from_args, the if val is not None: guard treats this as an explicit CLI value and overwrites whatever was loaded from YAML. A user who sets attribution_timeout_seconds: 120 in their config file will always get 60 seconds instead.
All other FT timeout CLI args (e.g. --ft-initial-rank-heartbeat-timeout, --ft-rank-heartbeat-timeout) correctly use default=None to avoid this problem.
| parser.add_argument( | |
| "--ft-attrsvc-port", | |
| "--ft_attrsvc_port", | |
| "--ft-attribution-timeout", | |
| "--ft_attribution_timeout", | |
| type=int, | |
| default=60, | |
| dest="ft_attribution_timeout_seconds", | |
| help="Attribution wait/timeout in seconds; skip result if exceeded (default: 60).", | |
| parser.add_argument( | |
| "--ft-attribution-timeout", | |
| "--ft_attribution_timeout", | |
| type=int, | |
| default=None, | |
| dest="ft_attribution_timeout_seconds", | |
| help="Attribution wait/timeout in seconds; skip result if exceeded (default: 60).", | |
| ) |
d2a9e36 to
941980e
Compare
941980e to
3a0d298
Compare
| # Notify peers immediately so they can proceed with their own rendezvous | ||
| if notify_peer and hasattr(self._rdzv_handler, '_barrier_state'): | ||
| self._rdzv_handler._barrier_state._increment_peer_aborted_count() | ||
| if open_rendezvous: | ||
| self._open_rendezvous_for_restart() | ||
|
|
||
| start = time.time() | ||
| should_terminate_early = self._run_attribution() | ||
| if should_terminate_early: | ||
| if self._ft_cfg.attribution_dry_run: | ||
| logger.info( | ||
| "[%s] Attribution dry run: would NOT restart (attribution says stop), " | ||
| "but proceeding as configured (action not applied).", | ||
| role, | ||
| ) | ||
| else: | ||
| logger.error("[%s] Attribution says do not restart; will not restart.", role) | ||
| return False |
There was a problem hiding this comment.
Rendezvous opened and peers notified before attribution decision
_open_rendezvous_for_restart() and _increment_peer_aborted_count() are called unconditionally at lines 617–619, before _run_attribution() is checked at line 622. If attribution returns True ("do not restart"), this function returns False (line 632) — but the rendezvous round is already open and peers have been told to expect an imminent restart.
The consequence: peers see peer_aborted_count > 0 and enter _handle_restart_decision(open_rendezvous=False), decide to restart, and attempt to join a rendezvous this node will never complete. If enough peers survive, training restarts on a subset of nodes despite attribution explicitly saying "stop". If not enough peers survive, the rendezvous stalls until timeout.
The original code (before this PR) correctly placed peer notification and rendezvous opening inside the self._remaining_restarts > 0 branch, i.e., only after all "should we restart?" checks passed. The fix is to defer these side effects until after attribution and progress checks confirm a restart will actually happen.
| if self._remaining_restarts > 0: | ||
| logger.info( | ||
| f"{self._remaining_restarts}/{spec.max_restarts} restart attempts left; restarting worker group...", | ||
| ) | ||
| self._remaining_restarts -= 1 | ||
| # Open rendezvous before restarting (for barrier-based rendezvous) | ||
| self._open_rendezvous_for_restart() | ||
| self._restart_workers(self._worker_group) |
There was a problem hiding this comment.
min-healthy policy silently bypasses attribution
_invoke_run_with_min_healthy_policy restarts directly via _open_rendezvous_for_restart() + _restart_workers() without calling _handle_restart_decision() or _run_attribution(). Any user who configures attribution_backends with --ft-restart-policy min-healthy will get attribution silently ignored on every restart — the analysis runs, but its "do not restart" signal is never acted upon.
The fix is to call _handle_restart_decision here just as _invoke_run_with_any_failed_policy does, or at minimum call _run_attribution() and respect its result before proceeding with the restart.
3a0d298 to
5222d8c
Compare
| configure( | ||
| slack_bot_token=slack_token, | ||
| slack_channel=slack_channel, | ||
| dataflow_index=df_idx, | ||
| cluster_name=cluster, | ||
| ) |
There was a problem hiding this comment.
Env-var Slack fallback silently disabled
_init_backends always calls configure(slack_bot_token="", ...) when no Slack config is set via CLI/YAML. Reading postprocessing/config.py, configure simply writes its arguments directly into the module-level singleton — passing an empty string sets config.slack_bot_token = "" and the env-var lookup is never triggered. The documented promise that Slack bot-token env vars are consulted as a fallback when --ft-slack-token-file is absent is therefore broken.
The module already provides configure_from_env, which treats a None argument as "not supplied — load from environment" while any non-None value is used as-is. The fix is to call configure_from_env instead of configure, passing None for the token/channel fields when they were not explicitly configured.
5222d8c to
69c1e7c
Compare
69c1e7c to
70b1970
Compare
| dataflow_index=ft_dataflow_index, | ||
| llm_api_key_file=llm_key_file, | ||
| ) | ||
| rdzv_configs["attribution_config"] = attribution_cfg.to_dict() |
There was a problem hiding this comment.
Slack credential logged in plaintext at job startup
attribution_cfg.to_dict() embeds the raw Slack notification credential in the rendezvous config dict (via SlackConfig.to_dict called with secrets included). That dict is then printed at INFO level in launch_agent (line 1694), so every SLURM job log exposes the credential in plain text.
Since FtRendezvousBarrierHandler and LogAnalysisClient run in the same process as the launcher, the raw credential does not need to travel through the rendezvous config dict at all. The fix is to store only the credential file path there and have the handler read the file directly — a pattern already supported by SlackConfig.from_dict.
|
Greptile encountered an error while reviewing this PR. Please reach out to support@greptile.com for assistance. |
Integrates log analysis attribution into the fault-tolerance restart decision on the TCPStore host.
Modes: mcp (MCP subprocess; when FR is discovered, uses combined log_fr_analyzer), url (HTTP attrsvc).
CLI (representative): --ft-attribution-loganalysis, --ft-attribution-timeout, --ft-attribution-dry-run, --ft-slack-channel, --ft-slack-token-file, --ft-dataflow-index.