Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/api-core/src/cfg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,29 @@ Extends `StateControllerConfig` with:
| `publish_timeout` | `Duration` | `1s` | Timeout for MQTT publish operations. |
| `queue_capacity` | `usize` | `1024` | Event buffer size for DSX publish work (events dropped when full). |
| `auth` | `MqttAuthConfig` | *(none)* | MQTT authentication settings. |
| `periodic_state_republish` | `PeriodicStateRepublishConfig` | *(disabled)* | Periodically re-publish current managed-host state so consumers that miss change events can reconcile (see [PeriodicStateRepublishConfig](#periodicstaterepublishconfig)). |

### `PeriodicStateRepublishConfig`

In addition to publishing on every state change, NICo can re-publish current
`ManagedHostState` on a timer. Republished messages use the same
`{topic_prefix}/{machineId}/state` topic and JSON payload as change-driven
events, so consumers handle them identically.

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `enabled` | `bool` | `false` | Enable periodic republishing. Change-driven publishing is unaffected by this setting. |
| `interval` | `Duration` | `5m` | How often a republish sweep runs. |
| `scope` | `RepublishScope` | `all` | Which managed hosts to publish each sweep (see [RepublishScope](#republishscope)). |
| `healthy_republish_every` | `u32` | `1` | When `scope = all`, publish healthy hosts only every Nth sweep; hosts with an active health alert are always published every sweep. `0` is treated as `1`. Ignored when `scope = unhealthy_only`. |
| `max_publishes_per_second` | `u32` | `0` | Upper bound on publishes per second within a sweep, to avoid bursting the broker on large sites. `0` disables pacing. |

### `RepublishScope`

| Value | Description |
|-------|-------------|
| `all` | Republish every managed host each sweep (healthy hosts may be published less often via `healthy_republish_every`). |
| `unhealthy_only` | Republish only managed hosts that currently have a health alert. |

### `DpfConfig`

Expand Down
82 changes: 82 additions & 0 deletions crates/api-core/src/cfg/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2168,6 +2168,12 @@ pub struct DsxExchangeEventBusConfig {

#[serde(default)]
pub auth: MqttAuthConfig,

/// Periodically re-publish current `ManagedHostState` in addition to
/// publishing on every state change. Lets integrators that cannot poll the
/// NICo API reconcile transitions they missed off the event bus.
#[serde(default)]
pub periodic_state_republish: PeriodicStateRepublishConfig,
}

impl DsxExchangeEventBusConfig {
Expand All @@ -2184,6 +2190,82 @@ impl DsxExchangeEventBusConfig {
}
}

/// Which managed hosts a periodic republish sweep publishes.
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RepublishScope {
/// Republish every managed host on each sweep. Healthy hosts can still be
/// published less often than unhealthy ones via `healthy_republish_every`.
#[default]
All,
/// Republish only managed hosts that currently have a health alert. Use
/// this to keep the event bus quiet and only re-advertise hosts that need
/// attention.
UnhealthyOnly,
}

/// Periodic republishing of `ManagedHostState` on the DSX Exchange Event Bus.
///
/// NICo publishes state on every transition, but integrators that cannot poll
/// the NICo API (e.g. network-restricted consumers) can miss a transition and
/// never reconcile. Re-sending current state on a timer lets those consumers
/// self-heal. Republished messages reuse the same topic and JSON payload as
/// change-driven events, so consumers handle them identically.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct PeriodicStateRepublishConfig {
/// Enable periodic republishing. Disabled by default. Change-driven
/// publishing is unaffected by this setting.
#[serde(default)]
pub enabled: bool,

/// How often a republish sweep runs. Defaults to 5 minutes.
#[serde(
default = "PeriodicStateRepublishConfig::default_interval",
deserialize_with = "deserialize_duration",
serialize_with = "as_std_duration"
)]
pub interval: std::time::Duration,

@ianderson-nvidia ianderson-nvidia Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is to be user configurable, you might want to set some const for minimum and maximum values for this duration. You can then clamp() to ensure values always fall within the acceptable range.

const PUBLISH_INTERVAL_MAX: u64 = 120
const PUBLISH_INTERVAL_MIN: u64 = 30

Then just use Durations::from_secs()

impl PeriodicStateRepublishConfig {
  pub fn publish_interval(&self) -> Duration {
    Duration::from_secs(self.interval.clamp(PUBLISH_INTERVAL_MIN, PUBLISH_INTERVAL_MAX))
}


Comment on lines +2221 to +2228

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate interval > 0 to prevent a hot loop.

interval currently accepts zero. With 0s, the run loop can execute continuously and hammer DB + broker. Add deserialization/startup validation to reject zero duration.

Suggested fix
 pub struct PeriodicStateRepublishConfig {
@@
-    pub interval: std::time::Duration,
+    pub interval: std::time::Duration,
@@
 }
+
+impl PeriodicStateRepublishConfig {
+    pub fn validate(&self) -> eyre::Result<()> {
+        if self.interval.is_zero() {
+            return Err(eyre::eyre!(
+                "dsx_exchange_event_bus.periodic_state_republish.interval must be > 0s"
+            ));
+        }
+        Ok(())
+    }
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api-core/src/cfg/file.rs` around lines 2211 - 2218, The
PeriodicStateRepublishConfig.interval field can be zero which allows a hot loop;
add validation that interval > 0 and reject zero during deserialization/startup.
Implement this by validating the parsed std::time::Duration for the
PeriodicStateRepublishConfig (e.g. inside a custom Deserialize impl or a
Post-deserialize validation method used after deserialize) and return an error
when duration.is_zero() (or == Duration::from_secs(0)); mention
PeriodicStateRepublishConfig and the interval field in the error so callers know
the bad config.

/// Which managed hosts to publish on each sweep.
#[serde(default)]
pub scope: RepublishScope,

/// When `scope = all`, publish healthy hosts only every Nth sweep to reduce
/// broker noise; hosts with an active health alert are always published on
/// every sweep. `1` (default) publishes healthy hosts every sweep. `0` is
/// treated as `1`. Ignored when `scope = unhealthy_only`.
#[serde(default = "PeriodicStateRepublishConfig::default_healthy_republish_every")]
pub healthy_republish_every: u32,

/// Upper bound on publishes per second within a single sweep, to avoid
/// bursting the broker on large sites. `0` (default) disables pacing and

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have it on by default. also is there stronger typing we can do? any Rate type?

/// publishes as fast as the broker accepts.
#[serde(default)]
pub max_publishes_per_second: u32,
}

impl Default for PeriodicStateRepublishConfig {
fn default() -> Self {
Self {
enabled: false,
interval: Self::default_interval(),
scope: RepublishScope::default(),
healthy_republish_every: Self::default_healthy_republish_every(),
max_publishes_per_second: 0,
}
Comment on lines +2247 to +2255

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Default enablement contradicts the stated feature contract.

PeriodicStateRepublishConfig::default() sets enabled: false, but this PR’s objective states periodic republish should default to enabled when DSX event bus is enabled. This currently ships the feature off-by-default unless every site opts in manually.

Suggested fix
 impl Default for PeriodicStateRepublishConfig {
     fn default() -> Self {
         Self {
-            enabled: false,
+            enabled: true,
             interval: Self::default_interval(),
             scope: RepublishScope::default(),
             healthy_republish_every: Self::default_healthy_republish_every(),
             max_publishes_per_second: 0,
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api-core/src/cfg/file.rs` around lines 2237 - 2245, The Default impl
for PeriodicStateRepublishConfig currently sets enabled: false which contradicts
the intended behavior; update PeriodicStateRepublishConfig::default() to set
enabled: true so periodic republish is on by default (keeping interval:
Self::default_interval(), scope: RepublishScope::default(),
healthy_republish_every: Self::default_healthy_republish_every(),
max_publishes_per_second: 0), and update any related tests/docs that assume the
feature is off by default. Ensure the change is applied in the Default
implementation and adjust unit tests or config consumers expecting the previous
off-by-default behavior.

}
}

impl PeriodicStateRepublishConfig {
pub const fn default_interval() -> std::time::Duration {
std::time::Duration::from_secs(300)
}

pub const fn default_healthy_republish_every() -> u32 {
1
}
}

/// Auto machine repair plugin related configuration
#[derive(Default, Clone, Copy, Debug, Deserialize, Serialize)]
pub struct AutoMachineRepairPluginConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/api-core/src/mqtt_state_change_hook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@

pub mod hook;
pub mod message;
pub mod republisher;
Loading
Loading