Add dmesg health logging and attribution#328
Conversation
a63f0fb to
708b0e7
Compare
Greptile SummaryThis PR introduces
Confidence Score: 4/5The PR is safe to merge. The FACT agent integration is well-isolated behind a flag, the teardown order is correct, and the previously-identified blocking issues have been addressed in this revision. The core data path is structurally sound. The remaining findings are narrow timing races that are benign under normal network conditions given the multi-hundred-millisecond FACT GET latency. agent.py: the _wait_for_grpc_writer_queue queue.empty() timing and the additive observation_deadline_s budget. hot_cache.py: unguarded add_current_cycle under concurrent cycles. Important Files Changed
Sequence DiagramsequenceDiagram
participant StoreHost as ft_launcher (store host)
participant Launcher as ft_launcher (leaf nodes)
participant StoreHostAgent as nvrx-fact-agent (store host)
participant FactAgent as nvrx-fact-agent (leaf)
participant TCPStore as TCPStore
participant FACT as FACT API
participant gRPC as gRPC Log Server
Note over StoreHost,Launcher: Cycle fails
StoreHost->>StoreHostAgent: notify_fact_agent(cycle_failed, expected_nodes)
Launcher->>FactAgent: "notify_fact_agent(cycle_failed, expected_nodes=[])"
StoreHostAgent-->>StoreHost: ACK
FactAgent-->>Launcher: ACK
par Store-host path
StoreHostAgent->>FACT: POST /attributor
FACT-->>StoreHostAgent: attributor_id
StoreHostAgent->>TCPStore: set(attributor_id_key, attributor_id)
StoreHostAgent->>StoreHostAgent: collect dmesg
StoreHostAgent->>FACT: POST /observation
StoreHostAgent->>gRPC: write fact_observation JSONL
StoreHostAgent->>TCPStore: add(done_count, 1)
StoreHostAgent->>TCPStore: "poll done_count == N"
and Leaf path
FactAgent->>FactAgent: collect dmesg
FactAgent->>TCPStore: get(attributor_id_key)
TCPStore-->>FactAgent: attributor_id
FactAgent->>FACT: POST /observation
FactAgent->>gRPC: write fact_observation JSONL
FactAgent->>TCPStore: add(done_count, 1)
end
StoreHostAgent->>FACT: GET /attribution
FACT-->>StoreHostAgent: FactAttributionResult
StoreHostAgent->>StoreHostAgent: compute_repeat_offender_decision
StoreHostAgent->>gRPC: write fact_result JSONL
StoreHost->>StoreHostAgent: get_avoid_nodes
StoreHostAgent-->>StoreHost: avoid_nodes
Reviews (20): Last reviewed commit: "Add FACT dmesg attribution integration" | Re-trigger Greptile |
| except Exception as exc: | ||
| logger.warning( | ||
| "Skipping direct FACT attribution for cycle %s: failed to create " | ||
| "attributor: %s", | ||
| cycle_index, | ||
| exc, | ||
| ) | ||
| return | ||
| else: | ||
| raw_attributor_id = self._store_get_bytes_with_deadline( | ||
| store, | ||
| attributor_key, | ||
| self._ft_cfg.fact_attribution_timeout, | ||
| ) | ||
| if not raw_attributor_id: | ||
| logger.warning( | ||
| "Skipping direct FACT attribution for cycle %s: timed out waiting " | ||
| "for attributor id", | ||
| cycle_index, | ||
| ) | ||
| return | ||
| attributor_id = raw_attributor_id.decode("utf-8") |
There was a problem hiding this comment.
Missing failure sentinel delays restart on all non-store-host nodes
When the store host fails to create a FACT attributor (e.g. the FACT service is unreachable), it logs a warning and returns without writing anything to attributor_key. Every non-store-host node is concurrently blocked in _stop_workers calling _store_get_bytes_with_deadline(store, attributor_key, self._ft_cfg.fact_attribution_timeout) (default 60 s). Since the key is never set, each non-store-host node waits the full 60 seconds before returning. Meanwhile the store host may already be in rendezvous for the next cycle, and if the rendezvous window is shorter than 60 s the non-store-host nodes miss it entirely — turning a transient FACT outage into a compounded restart failure. Writing a sentinel (e.g. b"failed") to attributor_key immediately after the exception would let non-store-host nodes detect the failure immediately.
| # FACT attribution service configuration (optional) | ||
| fact_attribution_url: Optional[str] = None | ||
| fact_attribution_timeout: float = 60.0 | ||
| fact_attribution_dmesg_source: str = "health_log" |
There was a problem hiding this comment.
The
fact_attribution_dmesg_source field only has two valid values ("health_log" and "direct"), but any other string (e.g. "health-log" with a typo) is silently accepted and treated as "health_log" mode. Adding validation in __post_init__ gives early, clear feedback.
| fact_attribution_dmesg_source: str = "health_log" | |
| fact_attribution_dmesg_source: str = "health_log" | |
| _VALID_DMESG_SOURCES: ClassVar[frozenset[str]] = frozenset({"health_log", "direct"}) |
| def serve_forever(self, *, max_rpc_bytes: int = DEFAULT_MAX_RPC_BYTES) -> None: | ||
| socket_path = Path(self.socket_path) | ||
| if socket_path.exists(): | ||
| socket_path.unlink() | ||
| socket_path.parent.mkdir(parents=True, exist_ok=True) | ||
| with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server: | ||
| server.bind(str(socket_path)) | ||
| os.chmod(socket_path, 0o600) | ||
| server.listen(128) | ||
| logger.info("nvrx-evidencesvc listening on %s", socket_path) | ||
| while not self._stop_event.is_set(): | ||
| conn, _ = server.accept() | ||
| threading.Thread( | ||
| target=self._handle_connection, | ||
| args=(conn, max_rpc_bytes), | ||
| daemon=True, | ||
| ).start() |
There was a problem hiding this comment.
serve_forever cannot be shut down cleanly
server.accept() blocks indefinitely; the while not self._stop_event.is_set() guard is evaluated only after a new connection arrives. Setting _stop_event from another thread will not wake the blocking call. There is no settimeout on the server socket, no select/poll loop, and no stop() method on EvidenceService. In practice the process must be killed to exit. Adding server.settimeout(1.0) and catching socket.timeout inside the loop would allow clean shutdown.
| def _write_via_grpc(self, text: str) -> None: | ||
| assert self.output_path is not None | ||
| channel = grpc.insecure_channel( | ||
| self.grpc_server_address, | ||
| options=[ | ||
| ("grpc.max_send_message_length", 10 * 1024 * 1024), | ||
| ("grpc.max_receive_message_length", 10 * 1024 * 1024), | ||
| ], | ||
| ) | ||
| try: | ||
| stub = log_aggregation_pb2_grpc.LogAggregationServiceStub(channel) | ||
| chunk = log_aggregation_pb2.LogChunk( | ||
| node_id=self.node_id or "", | ||
| data=text.encode("utf-8"), | ||
| file_path=self.output_path, | ||
| ) | ||
| stub.StreamLogs(iter([chunk]), timeout=_GRPC_TIMEOUT_S) | ||
| finally: | ||
| channel.close() |
There was a problem hiding this comment.
New gRPC channel created per dmesg collection
_write_via_grpc creates a brand-new grpc.insecure_channel for every write. The periodic collection fires every _PERIODIC_INTERVAL_S (180 s) plus once on session_stop and every collect_now, so channel churn is low in steady state. Still, stashing the channel on the DmesgCollector instance (lazily created, closed on shutdown) would be cleaner and avoids the connection establishment overhead on the hot path during failure detection when collect_now is called.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
708b0e7 to
209b68f
Compare
209b68f to
412d403
Compare
| try: | ||
| done_count = int(store.add(keys.done_count, 0)) | ||
| except Exception: | ||
| done_count = 0 | ||
| progressed = False | ||
| while next_index <= done_count: | ||
| raw_node = self._store_get_bytes_with_deadline(store, keys.done(next_index), 0.2) | ||
| if not raw_node: | ||
| break | ||
| next_index += 1 | ||
| node = raw_node.decode("utf-8") | ||
| raw_status = self._store_get_bytes_with_deadline(store, keys.status(node), 0.2) | ||
| if not raw_status: | ||
| continue | ||
| try: | ||
| status = json.loads(raw_status.decode("utf-8")) | ||
| if isinstance(status, dict): | ||
| statuses[str(status.get("node") or node)] = status | ||
| progressed = True | ||
| except json.JSONDecodeError: | ||
| logger.warning("evidencesvc found invalid status payload for %s", node) | ||
| if not progressed: | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
Node permanently skipped when status read returns
None
After done(N) is consumed and next_index is already incremented, a continue on the raw_status is None branch advances past that node permanently. Neither the outer loop nor any later inner-loop iteration ever goes back to recheck it, so the node ends up in the timeout bucket and its FACT observation is excluded from attribution.
The trigger is _store_get_bytes_with_deadline returning None — which happens for any TCPStore exception (the bare except Exception: return None at line 452 converts store errors into key-absence), not just for genuine 0.2 s timeouts. If a transient store error occurs while reading keys.status(node) right after done(N) was successfully read, the node is silently lost.
A minimal fix: decrement next_index before continue so the done/status pair is retried on the next outer-loop iteration (bounded by the outer deadline, so no infinite loop risk). Alternatively, collect "pending status" nodes in a separate set and drain them in subsequent outer-loop passes.
349c2ea to
7710c9c
Compare
| try: | ||
| raw_attributor_id = self._store_get_bytes_with_deadline( | ||
| store, keys.attributor_id, request.store_timeout_s | ||
| ) | ||
| if not raw_attributor_id: | ||
| raise RuntimeError("timed out waiting for attributor_id") | ||
| attributor_id = raw_attributor_id.decode("utf-8") | ||
| except Exception as exc: | ||
| status.update(status="post_failed", error=str(exc)) | ||
| self._write_terminal_status(store, keys, node, status) | ||
| return | ||
|
|
||
| end_time = datetime.now(timezone.utc) | ||
| start_time = end_time - timedelta(seconds=max(0.0, self.dmesg_window_s)) |
There was a problem hiding this comment.
The observation's time window metadata sent to FACT is anchored to the moment of submission, not the moment dmesg was collected.
dmesg_text is collected at line 333, but end_time is captured at line 364 — after _store_get_bytes_with_deadline(…, request.store_timeout_s), which can block for up to store_timeout_s (default 60 s) on leaf nodes waiting for the store-host to publish attributor_id. If the store-host takes 30 s to reach FACT and write attributor_id, every leaf node sends FACT an observation whose claimed window is [T+30 − 720s, T+30] while the actual dmesg content covers [T − 720s, T] — a 30-second forward shift. FACT's cross-node time correlation and window-based filtering will therefore be systematically misaligned for leaf nodes.
| try: | |
| raw_attributor_id = self._store_get_bytes_with_deadline( | |
| store, keys.attributor_id, request.store_timeout_s | |
| ) | |
| if not raw_attributor_id: | |
| raise RuntimeError("timed out waiting for attributor_id") | |
| attributor_id = raw_attributor_id.decode("utf-8") | |
| except Exception as exc: | |
| status.update(status="post_failed", error=str(exc)) | |
| self._write_terminal_status(store, keys, node, status) | |
| return | |
| end_time = datetime.now(timezone.utc) | |
| start_time = end_time - timedelta(seconds=max(0.0, self.dmesg_window_s)) | |
| collection_end_time = datetime.now(timezone.utc) | |
| try: | |
| raw_attributor_id = self._store_get_bytes_with_deadline( | |
| store, keys.attributor_id, request.store_timeout_s | |
| ) | |
| if not raw_attributor_id: | |
| raise RuntimeError("timed out waiting for attributor_id") | |
| attributor_id = raw_attributor_id.decode("utf-8") | |
| except Exception as exc: | |
| status.update(status="post_failed", error=str(exc)) | |
| self._write_terminal_status(store, keys, node, status) | |
| return | |
| end_time = collection_end_time | |
| start_time = end_time - timedelta(seconds=max(0.0, self.dmesg_window_s)) |
c1f13cf to
c03b987
Compare
| @staticmethod | ||
| def _store_get_bytes_with_deadline(store: Any, key: str, timeout_s: float) -> Optional[bytes]: | ||
| deadline = time.monotonic() + max(0.0, timeout_s) | ||
| while True: | ||
| try: | ||
| if store.check([key]): | ||
| return store.get(key) | ||
| except Exception: | ||
| return None | ||
| remaining = deadline - time.monotonic() | ||
| if remaining <= 0: | ||
| return None | ||
| time.sleep(min(0.1, remaining)) |
There was a problem hiding this comment.
Transient store exceptions immediately abort the entire deadline-bounded retry loop. Changing the
except branch to continue (with a short sleep) lets the loop honour the full timeout_s budget rather than failing at the first network blip.
| @staticmethod | |
| def _store_get_bytes_with_deadline(store: Any, key: str, timeout_s: float) -> Optional[bytes]: | |
| deadline = time.monotonic() + max(0.0, timeout_s) | |
| while True: | |
| try: | |
| if store.check([key]): | |
| return store.get(key) | |
| except Exception: | |
| return None | |
| remaining = deadline - time.monotonic() | |
| if remaining <= 0: | |
| return None | |
| time.sleep(min(0.1, remaining)) | |
| @staticmethod | |
| def _store_get_bytes_with_deadline(store: Any, key: str, timeout_s: float) -> Optional[bytes]: | |
| deadline = time.monotonic() + max(0.0, timeout_s) | |
| while True: | |
| try: | |
| if store.check([key]): | |
| return store.get(key) | |
| except Exception: | |
| pass # transient error – retry until deadline | |
| remaining = deadline - time.monotonic() | |
| if remaining <= 0: | |
| return None | |
| time.sleep(min(0.1, remaining)) |
ddba1b4 to
7929030
Compare
2cae5ce to
d551949
Compare
| def stop(self) -> None: | ||
| self._stop_event.set() | ||
| with self._grpc_writers_lock: | ||
| writers = list(self._grpc_writers.values()) | ||
| self._grpc_writers.clear() | ||
| self._wait_for_grpc_writer_queues(writers) | ||
| for _, writer in writers: | ||
| shutdown = getattr(writer, "shutdown", None) | ||
| if callable(shutdown): | ||
| with contextlib.suppress(Exception): | ||
| shutdown() | ||
| for _, writer in writers: | ||
| with contextlib.suppress(Exception): | ||
| writer.join(timeout=5.0) |
There was a problem hiding this comment.
stop() snapshots gRPC writers before in-flight executor tasks create theirs
stop() acquires _grpc_writers_lock, snapshots the current dict, clears it, then drains only those known writers. A process_cycle_failed task submitted to _executor shortly before the shutdown signal may still be running — it calls _enqueue_grpc_artifact → _get_grpc_writer after the lock is released and the map is already empty, which creates a brand-new GrpcWriterThread(daemon=True) entry that is never added to the drained set. When the process subsequently exits, the daemon writer thread is killed while its queue may still hold unflushed JSONL result records, silently dropping the last fact_observation or fact_result entry.
Fix: call self._executor.shutdown(wait=True, cancel_futures=False) before the _grpc_writers_lock block in stop(). FactAgentManager already kills the process after _FACT_AGENT_STOP_TIMEOUT = 10 s if graceful shutdown stalls, so adding the drain is safe.
| while time.monotonic() < deadline and completed_count < expected_node_count: | ||
| try: | ||
| completed_count = int(store.add(keys.done_count, 0)) | ||
| except Exception: | ||
| completed_count = 0 |
There was a problem hiding this comment.
Store exception resets
completed_count to 0, masking real progress
When store.add(keys.done_count, 0) throws (network blip, TCP timeout), the except Exception clause resets completed_count to 0. If the exception fires repeatedly near the deadline, the last polled value is 0 and the fact_result JSONL record carries completed_node_count=0 even though many nodes may have actually reported. Changing the handler to pass retains the last-known monotonically non-decreasing value.
| while time.monotonic() < deadline and completed_count < expected_node_count: | |
| try: | |
| completed_count = int(store.add(keys.done_count, 0)) | |
| except Exception: | |
| completed_count = 0 | |
| while time.monotonic() < deadline and completed_count < expected_node_count: | |
| try: | |
| completed_count = int(store.add(keys.done_count, 0)) | |
| except Exception: | |
| pass # keep the last known count; done_count is monotonically non-decreasing |
2baa7a1 to
0e4e69b
Compare
| grpc_graceful_shutdown_timeout = float( | ||
| getattr(args, 'ft_log_server_graceful_shutdown_timeout', 60.0) | ||
| ) | ||
| stop_grpc_log_servers(_GRPC_SERVER_PROCESSES, grpc_graceful_shutdown_timeout) | ||
|
|
||
| if _FACT_AGENT_MANAGER is not None: | ||
| _FACT_AGENT_MANAGER.stop() | ||
| _FACT_AGENT_MANAGER = None |
There was a problem hiding this comment.
gRPC server torn down before FACT agent is stopped, dropping result records
stop_grpc_log_servers on line 3822 blocks until the gRPC log servers shut down, then _FACT_AGENT_MANAGER.stop() on line 3825 sends the graceful shutdown signal to the agent subprocess. Inside the subprocess, FactAgent.stop() drains its internal GrpcWriterThread queues (up to _GRPC_RESULT_DRAIN_TIMEOUT_S = 4 s) before exiting. At that point the gRPC server that should receive the writes is already gone, so all pending writes fail silently. The most likely victim is the final fact_result JSONL record — the one the store host writes after receiving the full FactAttributionResult — because it is enqueued last and may still be in the writer queue when the manager calls stop().
Swapping the order so _FACT_AGENT_MANAGER.stop() completes before stop_grpc_log_servers is invoked lets the agent flush into a still-live server.
660baf2 to
24eb289
Compare
24eb289 to
35e5365
Compare
Summary
Adds failed-cycle dmesg collection through
nvrx-fact-agent, an NVRx-side client for FACT node attribution.Highlights:
nvrx-fact-agent: users provide--ft-fact-url; FT starts the local agent, sends a private UDScycle_failednotification, and immediately continues after ACK.ft_launcher; the agent collects bounded per-node dmesg windows, POSTs observations, and the store-host agent performs the FACT GET.nvidia_resiliency_ext.attribution.fact:agent.pyowns node-local collection/reporting,manager.pyowns launcher-side process lifecycle, andclient.pyowns FACT HTTP shaping/client calls.--ft-attribution-endpointis application-log attribution for job-levelSTOP/RESTART, while--ft-fact-urlis FACT node attribution used as node-suspicion/exclusion evidence.--ft-health-log-prefixand--ft-enable-health-log-dmesgare configured. Empty raw dmesg windows do not create 0-byte files.fact_observationrecords for terminal outcomes includingsubmitted,empty, and failure states; the store-host emits the fullFactAttributionResultasfact_result.attributor_idanddone_countin TCPStore for the FACT flow; observation details stay in FACT and the optional JSONL artifact.Validation
Local checks:
black --checkon changed Python files: passedisort --check-onlyon changed Python files: passedruff checkon changed Python files: passedpy_compileon changed Python files/tests: passedgit diff --check: passedPYTHONPATH=src python3 -m pytest -q tests/attribution/unit/test_fact_manager.py: passedEarlier local checks on this branch:
PYTHONPATH=src python3 -m pytest -q tests/attribution/unit/test_fact_client.py tests/attribution/unit/test_fact_manager.py: passedtests/attribution/unit/test_fact_agent.pywas not runnable in the local system Python becausetorchis not installed.tests/fault_tolerance/unit/test_launcher.py -k factandtests/fault_tolerance/unit/test_config.pywere not runnable in the local system Python becauseyamlis not installed.Dev cluster probe:
2830with Xid injection only ongb-nvl-134-compute01confirmed injected-node dmesg capture and no 0-byte dmesg artifact files.done_count. Final dev rerun is pending because SSH to the dev login started closing connections.