Skip to content

feat: Add periodic managed host state republishing#2359

Open
kfelternv wants to merge 3 commits into
NVIDIA:mainfrom
kfelternv:dsx-periodic-republish
Open

feat: Add periodic managed host state republishing#2359
kfelternv wants to merge 3 commits into
NVIDIA:mainfrom
kfelternv:dsx-periodic-republish

Conversation

@kfelternv

@kfelternv kfelternv commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Starts a background process that iterates through all of the machines managed by nico and reports the status to the message bus based on user configurations of how often they want the data and what data they want. Configs:

[dsx_exchange_event_bus.periodic_state_republish]

Config Default Description
enabled true Enable periodic republishing (on by default whenever the DSX Exchange Event Bus is enabled). Change-driven publishing on state transitions is unaffected by this setting.
interval 5m How often a republish sweep runs (loads all managed hosts and publishes their current state).
scope all Which managed hosts to publish each sweep: all, or unhealthy_only (only hosts that currently have a health alert).
healthy_republish_every 1 When scope = all, publish healthy hosts only every Nth sweep; hosts with an active health alert are always published every sweep. 0 is treated as 1. Ignored when scope = unhealthy_only.
max_publishes_per_second 0 Upper bound on publishes per second within a single sweep, to avoid bursting the broker on large sites. 0 disables pacing.

@copy-pr-bot

copy-pr-bot Bot commented Jun 10, 2026

Copy link
Copy Markdown

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 0cf3565e-16ee-4739-9a73-27913926eb55

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
crates/api-core/src/mqtt_state_change_hook/republisher.rs (1)

75-84: ⚡ Quick win

Avoid introducing a new #[allow(clippy::too_many_arguments)] here.

Please replace the suppression with a small parameter struct (or grouped config object) so clippy stays fully enforced on new code.

Suggested refactor sketch
-#[allow(clippy::too_many_arguments)]
-pub fn new(
-    publisher: P,
-    db_pool: sqlx::PgPool,
-    topic_prefix: String,
-    publish_timeout: Duration,
-    config: PeriodicStateRepublishConfig,
-    host_health_config: HostHealthConfig,
-    meter: &Meter,
-) -> Self {
+pub struct ManagedHostStateRepublisherParams {
+    pub db_pool: sqlx::PgPool,
+    pub topic_prefix: String,
+    pub publish_timeout: Duration,
+    pub config: PeriodicStateRepublishConfig,
+    pub host_health_config: HostHealthConfig,
+}
+
+pub fn new(publisher: P, params: ManagedHostStateRepublisherParams, meter: &Meter) -> Self {
     let metrics = MqttHookMetrics::without_queue_depth(meter, "managed_host_republish");
     Self {
         publisher,
-        db_pool,
-        topic_prefix,
-        publish_timeout,
-        config,
-        host_health_config,
+        db_pool: params.db_pool,
+        topic_prefix: params.topic_prefix,
+        publish_timeout: params.publish_timeout,
+        config: params.config,
+        host_health_config: params.host_health_config,
         metrics,
     }
 }

As per coding guidelines, “Avoid using #[allow(...)] annotations unless you have a strong reason to do so. New code should generally not have to #[allow] any lints or warnings.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api-core/src/mqtt_state_change_hook/republisher.rs` around lines 75 -
84, Replace the #[allow(clippy::too_many_arguments)] on Republisher::new by
introducing a small parameter struct (e.g., RepublisherParams or
RepublisherConfig) that groups related arguments (topic_prefix, publish_timeout,
config: PeriodicStateRepublishConfig, host_health_config: HostHealthConfig,
meter: Meter or MeterRef) and then change the constructor signature pub fn
new(publisher: P, db_pool: sqlx::PgPool, params: RepublisherParams) -> Self;
update the call sites and the body of new to pull values from the params struct
and remove the allow attribute so clippy remains enabled.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/api-core/src/cfg/file.rs`:
- Around line 2211-2218: The PeriodicStateRepublishConfig.interval field can be
zero which allows a hot loop; add validation that interval > 0 and reject zero
during deserialization/startup. Implement this by validating the parsed
std::time::Duration for the PeriodicStateRepublishConfig (e.g. inside a custom
Deserialize impl or a Post-deserialize validation method used after deserialize)
and return an error when duration.is_zero() (or == Duration::from_secs(0));
mention PeriodicStateRepublishConfig and the interval field in the error so
callers know the bad config.
- Around line 2237-2245: The Default impl for PeriodicStateRepublishConfig
currently sets enabled: false which contradicts the intended behavior; update
PeriodicStateRepublishConfig::default() to set enabled: true so periodic
republish is on by default (keeping interval: Self::default_interval(), scope:
RepublishScope::default(), healthy_republish_every:
Self::default_healthy_republish_every(), max_publishes_per_second: 0), and
update any related tests/docs that assume the feature is off by default. Ensure
the change is applied in the Default implementation and adjust unit tests or
config consumers expecting the previous off-by-default behavior.

---

Nitpick comments:
In `@crates/api-core/src/mqtt_state_change_hook/republisher.rs`:
- Around line 75-84: Replace the #[allow(clippy::too_many_arguments)] on
Republisher::new by introducing a small parameter struct (e.g.,
RepublisherParams or RepublisherConfig) that groups related arguments
(topic_prefix, publish_timeout, config: PeriodicStateRepublishConfig,
host_health_config: HostHealthConfig, meter: Meter or MeterRef) and then change
the constructor signature pub fn new(publisher: P, db_pool: sqlx::PgPool,
params: RepublisherParams) -> Self; update the call sites and the body of new to
pull values from the params struct and remove the allow attribute so clippy
remains enabled.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 8912d241-b8ca-4d92-aa13-8947752939af

📥 Commits

Reviewing files that changed from the base of the PR and between 205ca89 and b742932.

📒 Files selected for processing (6)
  • crates/api-core/src/cfg/README.md
  • crates/api-core/src/cfg/file.rs
  • crates/api-core/src/mqtt_state_change_hook/mod.rs
  • crates/api-core/src/mqtt_state_change_hook/republisher.rs
  • crates/api-core/src/setup.rs
  • crates/mqtt-common/src/metrics.rs

Comment on lines +2211 to +2218
/// How often a republish sweep runs. Defaults to 5 minutes.
#[serde(
default = "PeriodicStateRepublishConfig::default_interval",
deserialize_with = "deserialize_duration",
serialize_with = "as_std_duration"
)]
pub interval: std::time::Duration,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate interval > 0 to prevent a hot loop.

interval currently accepts zero. With 0s, the run loop can execute continuously and hammer DB + broker. Add deserialization/startup validation to reject zero duration.

Suggested fix
 pub struct PeriodicStateRepublishConfig {
@@
-    pub interval: std::time::Duration,
+    pub interval: std::time::Duration,
@@
 }
+
+impl PeriodicStateRepublishConfig {
+    pub fn validate(&self) -> eyre::Result<()> {
+        if self.interval.is_zero() {
+            return Err(eyre::eyre!(
+                "dsx_exchange_event_bus.periodic_state_republish.interval must be > 0s"
+            ));
+        }
+        Ok(())
+    }
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api-core/src/cfg/file.rs` around lines 2211 - 2218, The
PeriodicStateRepublishConfig.interval field can be zero which allows a hot loop;
add validation that interval > 0 and reject zero during deserialization/startup.
Implement this by validating the parsed std::time::Duration for the
PeriodicStateRepublishConfig (e.g. inside a custom Deserialize impl or a
Post-deserialize validation method used after deserialize) and return an error
when duration.is_zero() (or == Duration::from_secs(0)); mention
PeriodicStateRepublishConfig and the interval field in the error so callers know
the bad config.

Comment on lines +2237 to +2245
impl Default for PeriodicStateRepublishConfig {
fn default() -> Self {
Self {
enabled: false,
interval: Self::default_interval(),
scope: RepublishScope::default(),
healthy_republish_every: Self::default_healthy_republish_every(),
max_publishes_per_second: 0,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Default enablement contradicts the stated feature contract.

PeriodicStateRepublishConfig::default() sets enabled: false, but this PR’s objective states periodic republish should default to enabled when DSX event bus is enabled. This currently ships the feature off-by-default unless every site opts in manually.

Suggested fix
 impl Default for PeriodicStateRepublishConfig {
     fn default() -> Self {
         Self {
-            enabled: false,
+            enabled: true,
             interval: Self::default_interval(),
             scope: RepublishScope::default(),
             healthy_republish_every: Self::default_healthy_republish_every(),
             max_publishes_per_second: 0,
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api-core/src/cfg/file.rs` around lines 2237 - 2245, The Default impl
for PeriodicStateRepublishConfig currently sets enabled: false which contradicts
the intended behavior; update PeriodicStateRepublishConfig::default() to set
enabled: true so periodic republish is on by default (keeping interval:
Self::default_interval(), scope: RepublishScope::default(),
healthy_republish_every: Self::default_healthy_republish_every(),
max_publishes_per_second: 0), and update any related tests/docs that assume the
feature is off by default. Ensure the change is applied in the Default
implementation and adjust unit tests or config consumers expecting the previous
off-by-default behavior.

@kfelternv kfelternv marked this pull request as ready for review June 10, 2026 19:04
@kfelternv kfelternv requested a review from a team as a code owner June 10, 2026 19:04
@github-actions

Copy link
Copy Markdown

deserialize_with = "deserialize_duration",
serialize_with = "as_std_duration"
)]
pub interval: std::time::Duration,

@ianderson-nvidia ianderson-nvidia Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is to be user configurable, you might want to set some const for minimum and maximum values for this duration. You can then clamp() to ensure values always fall within the acceptable range.

const PUBLISH_INTERVAL_MAX: u64 = 120
const PUBLISH_INTERVAL_MIN: u64 = 30

Then just use Durations::from_secs()

impl PeriodicStateRepublishConfig {
  pub fn publish_interval(&self) -> Duration {
    Duration::from_secs(self.interval.clamp(PUBLISH_INTERVAL_MIN, PUBLISH_INTERVAL_MAX))
}

pub healthy_republish_every: u32,

/// Upper bound on publishes per second within a single sweep, to avoid
/// bursting the broker on large sites. `0` (default) disables pacing and

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have it on by default. also is there stronger typing we can do? any Rate type?

sweep = sweep.wrapping_add(1);

tokio::select! {
_ = tokio::time::sleep(self.config.interval) => {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't account for the time it takes to perform the publish. we can use https://docs.rs/tokio/latest/tokio/time/fn.interval.html

include_instance_data: false,
host_health_config: self.host_health_config,
};
let snapshots = db::managed_host::load_all(&self.db_pool, options).await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we load only the machines that this instance of nico is responsible for? this will end up with every instance publishing duplicate messages.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is doing that since the db:: is current instance of nico and is shared nothing (including db)

// One timestamp for the whole sweep: it marks when NICo asserts this
// state, not when each individual publish happened.
let timestamp = Utc::now();
let total = snapshots.len();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to stream this? is it common to load the whole managed host table in memory?

break;
}

let unhealthy = report_is_unhealthy(&snapshot.aggregate_health);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is_report_unhealthy is clearer that it returns a bool


if let Some(delay) = pacing {
tokio::select! {
_ = tokio::time::sleep(delay) => {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval

@ajf ajf requested review from kensimon and yoks June 10, 2026 22:47
state: &ManagedHostState,
timestamp: DateTime<Utc>,
) {
let message = ManagedHostStateChangeMessage {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe want to rename this to just state message. not necessarily a change anymore.

}
};

let topic = format!("{topic_prefix}/{machine_id}/state");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this serialisation and publish be common across the regular on change client and this one? particularly the topic is standing out as duplicate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants