diff --git a/crates/api-core/src/cfg/README.md b/crates/api-core/src/cfg/README.md index abe15bf45c..6dc293b62c 100644 --- a/crates/api-core/src/cfg/README.md +++ b/crates/api-core/src/cfg/README.md @@ -350,6 +350,29 @@ Extends `StateControllerConfig` with: | `publish_timeout` | `Duration` | `1s` | Timeout for MQTT publish operations. | | `queue_capacity` | `usize` | `1024` | Event buffer size for DSX publish work (events dropped when full). | | `auth` | `MqttAuthConfig` | *(none)* | MQTT authentication settings. | +| `periodic_state_republish` | `PeriodicStateRepublishConfig` | *(disabled)* | Periodically re-publish current managed-host state so consumers that miss change events can reconcile (see [PeriodicStateRepublishConfig](#periodicstaterepublishconfig)). | + +### `PeriodicStateRepublishConfig` + +In addition to publishing on every state change, NICo can re-publish current +`ManagedHostState` on a timer. Republished messages use the same +`{topic_prefix}/{machineId}/state` topic and JSON payload as change-driven +events, so consumers handle them identically. + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `enabled` | `bool` | `false` | Enable periodic republishing. Change-driven publishing is unaffected by this setting. | +| `interval` | `Duration` | `5m` | How often a republish sweep runs. | +| `scope` | `RepublishScope` | `all` | Which managed hosts to publish each sweep (see [RepublishScope](#republishscope)). | +| `healthy_republish_every` | `u32` | `1` | When `scope = all`, publish healthy hosts only every Nth sweep; hosts with an active health alert are always published every sweep. `0` is treated as `1`. Ignored when `scope = unhealthy_only`. | +| `max_publishes_per_second` | `u32` | `0` | Upper bound on publishes per second within a sweep, to avoid bursting the broker on large sites. `0` disables pacing. | + +### `RepublishScope` + +| Value | Description | +|-------|-------------| +| `all` | Republish every managed host each sweep (healthy hosts may be published less often via `healthy_republish_every`). | +| `unhealthy_only` | Republish only managed hosts that currently have a health alert. | ### `DpfConfig` diff --git a/crates/api-core/src/cfg/file.rs b/crates/api-core/src/cfg/file.rs index d2666e4feb..50ce809b78 100644 --- a/crates/api-core/src/cfg/file.rs +++ b/crates/api-core/src/cfg/file.rs @@ -2168,6 +2168,12 @@ pub struct DsxExchangeEventBusConfig { #[serde(default)] pub auth: MqttAuthConfig, + + /// Periodically re-publish current `ManagedHostState` in addition to + /// publishing on every state change. Lets integrators that cannot poll the + /// NICo API reconcile transitions they missed off the event bus. + #[serde(default)] + pub periodic_state_republish: PeriodicStateRepublishConfig, } impl DsxExchangeEventBusConfig { @@ -2184,6 +2190,82 @@ impl DsxExchangeEventBusConfig { } } +/// Which managed hosts a periodic republish sweep publishes. +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum RepublishScope { + /// Republish every managed host on each sweep. Healthy hosts can still be + /// published less often than unhealthy ones via `healthy_republish_every`. + #[default] + All, + /// Republish only managed hosts that currently have a health alert. Use + /// this to keep the event bus quiet and only re-advertise hosts that need + /// attention. + UnhealthyOnly, +} + +/// Periodic republishing of `ManagedHostState` on the DSX Exchange Event Bus. +/// +/// NICo publishes state on every transition, but integrators that cannot poll +/// the NICo API (e.g. network-restricted consumers) can miss a transition and +/// never reconcile. Re-sending current state on a timer lets those consumers +/// self-heal. Republished messages reuse the same topic and JSON payload as +/// change-driven events, so consumers handle them identically. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct PeriodicStateRepublishConfig { + /// Enable periodic republishing. Disabled by default. Change-driven + /// publishing is unaffected by this setting. + #[serde(default)] + pub enabled: bool, + + /// How often a republish sweep runs. Defaults to 5 minutes. + #[serde( + default = "PeriodicStateRepublishConfig::default_interval", + deserialize_with = "deserialize_duration", + serialize_with = "as_std_duration" + )] + pub interval: std::time::Duration, + + /// Which managed hosts to publish on each sweep. + #[serde(default)] + pub scope: RepublishScope, + + /// When `scope = all`, publish healthy hosts only every Nth sweep to reduce + /// broker noise; hosts with an active health alert are always published on + /// every sweep. `1` (default) publishes healthy hosts every sweep. `0` is + /// treated as `1`. Ignored when `scope = unhealthy_only`. + #[serde(default = "PeriodicStateRepublishConfig::default_healthy_republish_every")] + pub healthy_republish_every: u32, + + /// Upper bound on publishes per second within a single sweep, to avoid + /// bursting the broker on large sites. `0` (default) disables pacing and + /// publishes as fast as the broker accepts. + #[serde(default)] + pub max_publishes_per_second: u32, +} + +impl Default for PeriodicStateRepublishConfig { + fn default() -> Self { + Self { + enabled: false, + interval: Self::default_interval(), + scope: RepublishScope::default(), + healthy_republish_every: Self::default_healthy_republish_every(), + max_publishes_per_second: 0, + } + } +} + +impl PeriodicStateRepublishConfig { + pub const fn default_interval() -> std::time::Duration { + std::time::Duration::from_secs(300) + } + + pub const fn default_healthy_republish_every() -> u32 { + 1 + } +} + /// Auto machine repair plugin related configuration #[derive(Default, Clone, Copy, Debug, Deserialize, Serialize)] pub struct AutoMachineRepairPluginConfig { diff --git a/crates/api-core/src/mqtt_state_change_hook/mod.rs b/crates/api-core/src/mqtt_state_change_hook/mod.rs index 771df0da75..a4c30f0f99 100644 --- a/crates/api-core/src/mqtt_state_change_hook/mod.rs +++ b/crates/api-core/src/mqtt_state_change_hook/mod.rs @@ -24,3 +24,4 @@ pub mod hook; pub mod message; +pub mod republisher; diff --git a/crates/api-core/src/mqtt_state_change_hook/republisher.rs b/crates/api-core/src/mqtt_state_change_hook/republisher.rs new file mode 100644 index 0000000000..11c4d907e0 --- /dev/null +++ b/crates/api-core/src/mqtt_state_change_hook/republisher.rs @@ -0,0 +1,505 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Periodic republishing of current `ManagedHostState`. +//! +//! [`MqttStateChangeHook`](super::hook::MqttStateChangeHook) publishes state on +//! every transition. Integrators that cannot poll the NICo API (e.g. they are +//! network-restricted) can miss a transition and never reconcile. This module +//! walks the current managed hosts on a timer and re-publishes their state to +//! the same `{topic_prefix}/{machineId}/state` topic with the same JSON payload +//! as change-driven events, so downstream consumers handle both identically and +//! can self-heal. +//! +//! Tuning is driven by [`PeriodicStateRepublishConfig`]: sweep `interval`, +//! whether to publish all hosts or only unhealthy ones (`scope`), how often +//! healthy hosts are re-published relative to unhealthy ones +//! (`healthy_republish_every`), and an optional per-sweep publish rate limit +//! (`max_publishes_per_second`). + +use std::time::Duration; + +use carbide_mqtt_common::hook::MqttPublisher; +use carbide_mqtt_common::metrics::MqttHookMetrics; +use carbide_uuid::machine::MachineId; +use chrono::{DateTime, Utc}; +use health_report::HealthReport; +use model::machine::{HostHealthConfig, LoadSnapshotOptions, ManagedHostState}; +use opentelemetry::metrics::Meter; +use tokio::task::JoinSet; +use tokio::time::{Instant, timeout_at}; +use tokio_util::sync::CancellationToken; + +use crate::cfg::file::{PeriodicStateRepublishConfig, RepublishScope}; +use crate::mqtt_state_change_hook::message::ManagedHostStateChangeMessage; + +/// Periodically re-publishes current `ManagedHostState` for managed hosts to +/// the DSX Exchange Event Bus. +/// +/// Unlike [`MqttStateChangeHook`](super::hook::MqttStateChangeHook), which +/// buffers change events in a bounded queue, the republisher publishes directly +/// from its sweep. A full sweep of a large site can be many publishes, so it +/// would otherwise risk overflowing (and dropping) the change-event queue; +/// publishing directly keeps the two paths independent and lets +/// `max_publishes_per_second` bound the burst. +pub struct ManagedHostStateRepublisher { + publisher: P, + db_pool: sqlx::PgPool, + topic_prefix: String, + publish_timeout: Duration, + config: PeriodicStateRepublishConfig, + host_health_config: HostHealthConfig, + metrics: MqttHookMetrics, +} + +impl ManagedHostStateRepublisher

{ + /// Create a new republisher. + /// + /// Reuses the change-hook publish metrics (`carbide_dsx_event_bus_publish_count`) + /// under the `managed_host_republish` component so periodic publishes can be + /// told apart from change-driven ones on dashboards. + #[allow(clippy::too_many_arguments)] + pub fn new( + publisher: P, + db_pool: sqlx::PgPool, + topic_prefix: String, + publish_timeout: Duration, + config: PeriodicStateRepublishConfig, + host_health_config: HostHealthConfig, + meter: &Meter, + ) -> Self { + let metrics = MqttHookMetrics::without_queue_depth(meter, "managed_host_republish"); + Self { + publisher, + db_pool, + topic_prefix, + publish_timeout, + config, + host_health_config, + metrics, + } + } + + /// Spawn the republisher's background sweep loop into `join_set` when + /// enabled. A no-op when `config.enabled` is false. + pub fn start( + self, + join_set: &mut JoinSet<()>, + cancel_token: CancellationToken, + ) -> std::io::Result<()> { + if self.config.enabled { + tracing::info!( + interval_secs = self.config.interval.as_secs(), + scope = ?self.config.scope, + healthy_republish_every = self.config.healthy_republish_every, + max_publishes_per_second = self.config.max_publishes_per_second, + "Starting periodic managed host state republisher" + ); + join_set + .build_task() + .name("managed_host_state_republisher") + .spawn(async move { self.run(cancel_token).await })?; + } + + Ok(()) + } + + async fn run(self, cancel_token: CancellationToken) { + let mut sweep: u64 = 0; + loop { + let publish_healthy = should_publish_healthy( + self.config.scope, + self.config.healthy_republish_every, + sweep, + ); + if let Err(e) = self.run_sweep(publish_healthy, &cancel_token).await { + tracing::warn!(error = %e, "Managed host state republish sweep failed"); + } + sweep = sweep.wrapping_add(1); + + tokio::select! { + _ = tokio::time::sleep(self.config.interval) => {} + _ = cancel_token.cancelled() => { + tracing::debug!("Managed host state republisher stop requested"); + return; + } + } + } + } + + /// Load every managed host and re-publish those selected by the current + /// scope. Unhealthy hosts are always published; healthy hosts are published + /// only when `publish_healthy` is true for this sweep. + async fn run_sweep( + &self, + publish_healthy: bool, + cancel_token: &CancellationToken, + ) -> eyre::Result<()> { + // Instance data is not needed: the message only carries `machine_id` + // and `managed_state` (read from the host's own state column), and + // aggregate health is derived from host + DPU snapshots. + let options = LoadSnapshotOptions { + include_history: false, + include_instance_data: false, + host_health_config: self.host_health_config, + }; + let snapshots = db::managed_host::load_all(&self.db_pool, options).await?; + + let pacing = pacing_delay(self.config.max_publishes_per_second); + // One timestamp for the whole sweep: it marks when NICo asserts this + // state, not when each individual publish happened. + let timestamp = Utc::now(); + let total = snapshots.len(); + let mut published = 0usize; + let mut skipped_healthy = 0usize; + + for snapshot in &snapshots { + if cancel_token.is_cancelled() { + break; + } + + let unhealthy = report_is_unhealthy(&snapshot.aggregate_health); + if !should_publish(unhealthy, publish_healthy) { + skipped_healthy += 1; + continue; + } + + publish_state( + &self.publisher, + &self.topic_prefix, + self.publish_timeout, + &self.metrics, + &snapshot.host_snapshot.id, + &snapshot.managed_state, + timestamp, + ) + .await; + published += 1; + + if let Some(delay) = pacing { + tokio::select! { + _ = tokio::time::sleep(delay) => {} + _ = cancel_token.cancelled() => break, + } + } + } + + tracing::info!( + total, + published, + skipped_healthy, + publish_healthy, + scope = ?self.config.scope, + "Managed host state republish sweep complete" + ); + + Ok(()) + } +} + +/// Whether healthy hosts should be published on a given sweep (0-indexed). +/// +/// `UnhealthyOnly` never publishes healthy hosts. `All` publishes them every +/// `healthy_republish_every` sweeps (sweep 0 always publishes); `0` is treated +/// as `1`. +fn should_publish_healthy(scope: RepublishScope, healthy_republish_every: u32, sweep: u64) -> bool { + match scope { + RepublishScope::UnhealthyOnly => false, + RepublishScope::All => { + let every = u64::from(healthy_republish_every.max(1)); + sweep.is_multiple_of(every) + } + } +} + +/// Whether a host should be published this sweep: unhealthy hosts always are; +/// healthy hosts only when `publish_healthy` is set for the sweep. +fn should_publish(unhealthy: bool, publish_healthy: bool) -> bool { + unhealthy || publish_healthy +} + +/// A managed host is "unhealthy" when its aggregate health carries at least one +/// alert. +fn report_is_unhealthy(report: &HealthReport) -> bool { + !report.alerts.is_empty() +} + +/// Per-publish delay that bounds a sweep to `max_publishes_per_second`, or +/// `None` when unbounded (`0`). +fn pacing_delay(max_publishes_per_second: u32) -> Option { + (max_publishes_per_second > 0) + .then(|| Duration::from_secs_f64(1.0 / f64::from(max_publishes_per_second))) +} + +/// Serialize and publish a single managed host's current state, bounded by +/// `publish_timeout`, recording the outcome in `metrics`. +async fn publish_state( + publisher: &P, + topic_prefix: &str, + publish_timeout: Duration, + metrics: &MqttHookMetrics, + machine_id: &MachineId, + state: &ManagedHostState, + timestamp: DateTime, +) { + let message = ManagedHostStateChangeMessage { + machine_id, + managed_host_state: state, + timestamp, + }; + + let payload = match message.to_json_bytes() { + Ok(payload) => payload, + Err(e) => { + tracing::error!( + machine_id = %machine_id, + error = %e, + "Failed to serialize managed host state for republish" + ); + metrics.record_serialization_error(); + return; + } + }; + + let topic = format!("{topic_prefix}/{machine_id}/state"); + let deadline = Instant::now() + publish_timeout; + match timeout_at(deadline, publisher.publish(&topic, payload)).await { + Ok(Ok(())) => { + tracing::debug!(topic = %topic, "Republished managed host state to MQTT"); + metrics.record_success(); + } + Ok(Err(e)) => { + tracing::warn!(topic = %topic, error = %e, "Failed to republish managed host state"); + metrics.record_publish_error(); + } + Err(_) => { + tracing::warn!(topic = %topic, "Managed host state republish timed out"); + metrics.record_timeout(); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use carbide_uuid::machine::{MachineIdSource, MachineType}; + use mqttea::MqtteaClientError; + use opentelemetry::global; + use tokio::sync::Barrier; + + use super::*; + + fn test_meter() -> Meter { + global::meter("republisher-test") + } + + fn test_metrics() -> MqttHookMetrics { + MqttHookMetrics::without_queue_depth(&test_meter(), "managed_host_republish_test") + } + + fn test_machine_id() -> MachineId { + MachineId::new( + MachineIdSource::ProductBoardChassisSerial, + [0; 32], + MachineType::Host, + ) + } + + /// Publisher that forwards each published (topic, payload) over a channel. + struct SignalingPublisher { + sender: tokio::sync::mpsc::UnboundedSender<(String, Vec)>, + } + + impl SignalingPublisher { + fn new() -> ( + Self, + tokio::sync::mpsc::UnboundedReceiver<(String, Vec)>, + ) { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + (Self { sender }, receiver) + } + } + + #[async_trait::async_trait] + impl MqttPublisher for SignalingPublisher { + async fn publish(&self, topic: &str, payload: Vec) -> Result<(), MqtteaClientError> { + let _ = self.sender.send((topic.to_string(), payload)); + Ok(()) + } + } + + #[test] + fn unhealthy_only_never_publishes_healthy() { + for sweep in 0..5 { + assert!(!should_publish_healthy( + RepublishScope::UnhealthyOnly, + 1, + sweep + )); + } + } + + #[test] + fn all_scope_every_one_publishes_healthy_every_sweep() { + for sweep in 0..5 { + assert!(should_publish_healthy(RepublishScope::All, 1, sweep)); + } + } + + #[test] + fn all_scope_zero_cadence_is_treated_as_one() { + for sweep in 0..5 { + assert!(should_publish_healthy(RepublishScope::All, 0, sweep)); + } + } + + #[test] + fn all_scope_every_three_publishes_healthy_on_multiples() { + let got: Vec = (0..7) + .map(|sweep| should_publish_healthy(RepublishScope::All, 3, sweep)) + .collect(); + assert_eq!( + got, + vec![true, false, false, true, false, false, true], + "healthy hosts publish on sweeps 0, 3, 6" + ); + } + + #[test] + fn unhealthy_hosts_always_publish_regardless_of_healthy_flag() { + assert!(should_publish(true, false)); + assert!(should_publish(true, true)); + } + + #[test] + fn healthy_hosts_publish_only_when_flag_set() { + assert!(!should_publish(false, false)); + assert!(should_publish(false, true)); + } + + #[test] + fn report_with_no_alerts_is_healthy() { + assert!(!report_is_unhealthy(&HealthReport::empty( + "test".to_string() + ))); + } + + #[test] + fn report_with_an_alert_is_unhealthy() { + // `missing_report` carries a single alert. + assert!(report_is_unhealthy(&HealthReport::missing_report())); + } + + #[test] + fn pacing_disabled_when_zero() { + assert_eq!(pacing_delay(0), None); + } + + #[test] + fn pacing_divides_one_second_by_rate() { + assert_eq!(pacing_delay(10), Some(Duration::from_millis(100))); + assert_eq!(pacing_delay(1), Some(Duration::from_secs(1))); + } + + #[tokio::test] + async fn publish_state_uses_state_topic_and_payload() { + let (publisher, mut receiver) = SignalingPublisher::new(); + let metrics = test_metrics(); + let id = test_machine_id(); + let state = ManagedHostState::Ready; + + publish_state( + &publisher, + "NICO/v1/machine", + Duration::from_secs(1), + &metrics, + &id, + &state, + Utc::now(), + ) + .await; + + let (topic, payload) = receiver.recv().await.expect("should receive message"); + assert_eq!(topic, format!("NICO/v1/machine/{id}/state")); + + let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); + assert_eq!( + parsed + .get("managed_host_state") + .unwrap() + .get("state") + .unwrap(), + "ready" + ); + assert!(parsed.get("machine_id").is_some()); + assert!(parsed.get("timestamp").is_some()); + } + + #[tokio::test] + async fn publish_state_respects_publish_timeout() { + let started = Arc::new(Barrier::new(2)); + let call_count = Arc::new(AtomicUsize::new(0)); + let complete_count = Arc::new(AtomicUsize::new(0)); + + struct TimeoutPublisher { + started: Arc, + call_count: Arc, + complete_count: Arc, + } + + #[async_trait::async_trait] + impl MqttPublisher for TimeoutPublisher { + async fn publish(&self, _: &str, _: Vec) -> Result<(), MqtteaClientError> { + self.call_count.fetch_add(1, Ordering::SeqCst); + self.started.wait().await; + std::future::pending::<()>().await; + self.complete_count.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + let publisher = TimeoutPublisher { + started: started.clone(), + call_count: call_count.clone(), + complete_count: complete_count.clone(), + }; + let metrics = test_metrics(); + let id = test_machine_id(); + let state = ManagedHostState::Ready; + + let publish = publish_state( + &publisher, + "NICO/v1/machine", + Duration::from_millis(10), + &metrics, + &id, + &state, + Utc::now(), + ); + + // The publish returns once the timeout fires; the inner publish never + // completes (it is parked on `pending`). + tokio::join!(publish, async { + started.wait().await; + }); + + assert_eq!(call_count.load(Ordering::SeqCst), 1); + assert_eq!(complete_count.load(Ordering::SeqCst), 0); + } +} diff --git a/crates/api-core/src/setup.rs b/crates/api-core/src/setup.rs index dd61678008..e588164c50 100644 --- a/crates/api-core/src/setup.rs +++ b/crates/api-core/src/setup.rs @@ -104,6 +104,7 @@ use crate::logging::service_health_metrics::{ use crate::machine_update_manager::MachineUpdateManager; use crate::measured_boot::metrics_collector::MeasuredBootMetricsCollector; use crate::mqtt_state_change_hook::hook::MqttStateChangeHook; +use crate::mqtt_state_change_hook::republisher::ManagedHostStateRepublisher; use crate::scout_stream::ConnectionRegistry; use crate::{attestation, db_init, ethernet_virtualization, listener}; @@ -1040,6 +1041,19 @@ pub async fn initialize_and_start_controllers<'a>( .set(bms_client) .map_err(|_| eyre::eyre!("BMS DSX Exchange handle already initialized"))?; + // Periodically re-publish current managed host state so consumers + // that miss change events can reconcile. A no-op unless enabled. + ManagedHostStateRepublisher::new( + client.clone(), + db_pool.clone(), + config.topic_prefix.clone(), + config.publish_timeout, + config.periodic_state_republish.clone(), + carbide_config.host_health, + &meter, + ) + .start(join_set, cancel_token.clone())?; + emitter_builder = emitter_builder.hook(Box::new(MqttStateChangeHook::new( client, join_set, diff --git a/crates/mqtt-common/src/metrics.rs b/crates/mqtt-common/src/metrics.rs index df7d5246cd..5bc9ca4d64 100644 --- a/crates/mqtt-common/src/metrics.rs +++ b/crates/mqtt-common/src/metrics.rs @@ -57,17 +57,29 @@ impl MqttHookMetrics { }) .build(); - let publish_count = meter - .u64_counter("carbide_dsx_event_bus_publish_count") - .with_description("Total number of MQTT publish attempts") - .build(); + Self { + publish_count: Self::build_publish_count(meter), + component, + } + } + /// Create metrics for a publisher that does not buffer messages in a + /// bounded queue, so no queue-depth gauge is registered. Used by the + /// periodic state republisher, which publishes directly from its sweep. + pub fn without_queue_depth(meter: &Meter, component: &'static str) -> Self { Self { - publish_count, + publish_count: Self::build_publish_count(meter), component, } } + fn build_publish_count(meter: &Meter) -> Counter { + meter + .u64_counter("carbide_dsx_event_bus_publish_count") + .with_description("Total number of MQTT publish attempts") + .build() + } + fn attrs(&self, status: &'static str) -> [KeyValue; 2] { [ KeyValue::new("component", self.component),