From 618aedb251cc8087a8060a78b5bb91c3db0b9161 Mon Sep 17 00:00:00 2001 From: Ivan Anisimov Date: Wed, 10 Jun 2026 14:49:43 -0700 Subject: [PATCH 1/5] feat: hw-health fetch non-sensor metrics data --- crates/bmc-mock/src/hw/bluefield3.rs | 1 + crates/bmc-mock/src/hw/dell_poweredge_r750.rs | 1 + crates/bmc-mock/src/hw/dgx_gb300_nvl.rs | 2 + crates/bmc-mock/src/hw/generic_ami.rs | 1 + crates/bmc-mock/src/hw/lenovo_gb300_nvl.rs | 2 + crates/bmc-mock/src/hw/liteon_power_shelf.rs | 1 + crates/bmc-mock/src/hw/nvidia_dgx_h100.rs | 2 + .../src/hw/nvidia_switch_nd5200_ld.rs | 1 + .../bmc-mock/src/hw/supermicro_gb300_nvl.rs | 2 + crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs | 12 + .../bmc-mock/src/redfish/computer_system.rs | 78 ++ crates/bmc-mock/src/redfish/mod.rs | 1 + crates/health/benches/collector_pipeline.rs | 6 +- crates/health/benches/processor_pipeline.rs | 7 +- crates/health/benches/sink_pipeline.rs | 4 +- crates/health/example/config.example.toml | 12 +- crates/health/src/collectors/mod.rs | 6 + crates/health/src/collectors/nmxt.rs | 4 +- .../nvue/gnmi/on_change_processor.rs | 6 +- .../collectors/nvue/gnmi/sample_processor.rs | 4 +- .../src/collectors/nvue/rest/collector.rs | 4 +- crates/health/src/collectors/sensors.rs | 731 +++--------------- crates/health/src/config.rs | 76 +- crates/health/src/discovery/cleanup.rs | 3 + crates/health/src/discovery/context.rs | 54 +- crates/health/src/discovery/spawn.rs | 220 +++++- crates/health/src/lib.rs | 11 +- crates/health/src/otlp/convert.rs | 8 +- crates/health/src/otlp/metrics_drain.rs | 6 +- crates/health/src/processor/health_report.rs | 14 +- crates/health/src/processor/leak_events.rs | 2 +- crates/health/src/processor/mod.rs | 2 +- crates/health/src/sink/events.rs | 8 +- crates/health/src/sink/mod.rs | 18 +- crates/health/src/sink/otlp.rs | 12 +- crates/health/src/sink/prometheus.rs | 4 +- 36 files changed, 603 insertions(+), 723 deletions(-) diff --git a/crates/bmc-mock/src/hw/bluefield3.rs b/crates/bmc-mock/src/hw/bluefield3.rs index f9c0f6bcba..e345e5e575 100644 --- a/crates/bmc-mock/src/hw/bluefield3.rs +++ b/crates/bmc-mock/src/hw/bluefield3.rs @@ -184,6 +184,7 @@ impl Bluefield3<'_> { }, })), storage: None, + processors: None, secure_boot_available: true, }], } diff --git a/crates/bmc-mock/src/hw/dell_poweredge_r750.rs b/crates/bmc-mock/src/hw/dell_poweredge_r750.rs index 1e0074b335..1c99fdc047 100644 --- a/crates/bmc-mock/src/hw/dell_poweredge_r750.rs +++ b/crates/bmc-mock/src/hw/dell_poweredge_r750.rs @@ -149,6 +149,7 @@ impl DellPowerEdgeR750<'_> { // there. So we provide empty collection to avoid 404 // failure. storage: Some(vec![]), + processors: None, secure_boot_available: true, base_bios: Some(redfish::bios::builder(&redfish::bios::resource(system_id)) .attributes(json!({ diff --git a/crates/bmc-mock/src/hw/dgx_gb300_nvl.rs b/crates/bmc-mock/src/hw/dgx_gb300_nvl.rs index eedcab0c12..b496b28e7a 100644 --- a/crates/bmc-mock/src/hw/dgx_gb300_nvl.rs +++ b/crates/bmc-mock/src/hw/dgx_gb300_nvl.rs @@ -172,6 +172,7 @@ impl DgxGB300Nvl<'_> { secure_boot_available: false, serial_number: Some(self.hgx_serial_number.to_string().into()), storage: None, + processors: None, }, redfish::computer_system::SingleSystemConfig { base_bios: Some(base_bios(system_id)), @@ -190,6 +191,7 @@ impl DgxGB300Nvl<'_> { secure_boot_available: true, serial_number: Some(self.system_0_serial_number.to_string().into()), storage: None, + processors: None, }, ], } diff --git a/crates/bmc-mock/src/hw/generic_ami.rs b/crates/bmc-mock/src/hw/generic_ami.rs index d157d5fb23..28b179ce60 100644 --- a/crates/bmc-mock/src/hw/generic_ami.rs +++ b/crates/bmc-mock/src/hw/generic_ami.rs @@ -94,6 +94,7 @@ impl GenericAmi<'_> { oem: redfish::computer_system::Oem::Generic, log_services: None, storage: None, + processors: None, base_bios: Some( redfish::bios::builder(&redfish::bios::resource(system_id)) .attributes(json!({"EndlessBoot":""})) diff --git a/crates/bmc-mock/src/hw/lenovo_gb300_nvl.rs b/crates/bmc-mock/src/hw/lenovo_gb300_nvl.rs index ddcbb2516d..eae716cb93 100644 --- a/crates/bmc-mock/src/hw/lenovo_gb300_nvl.rs +++ b/crates/bmc-mock/src/hw/lenovo_gb300_nvl.rs @@ -176,6 +176,7 @@ impl LenovoGB300Nvl<'_> { secure_boot_available: false, serial_number: Some(self.hgx_serial_number.to_string().into()), storage: None, + processors: None, }, redfish::computer_system::SingleSystemConfig { base_bios: Some(base_bios(system_id)), @@ -195,6 +196,7 @@ impl LenovoGB300Nvl<'_> { secure_boot_available: true, serial_number: Some(self.system_0_serial_number.to_string().into()), storage: None, + processors: None, }, ], } diff --git a/crates/bmc-mock/src/hw/liteon_power_shelf.rs b/crates/bmc-mock/src/hw/liteon_power_shelf.rs index 42badb2e8a..2ac861ea04 100644 --- a/crates/bmc-mock/src/hw/liteon_power_shelf.rs +++ b/crates/bmc-mock/src/hw/liteon_power_shelf.rs @@ -80,6 +80,7 @@ impl LiteOnPowerShelf<'_> { oem: redfish::computer_system::Oem::Generic, log_services: None, storage: None, + processors: None, base_bios: Some( redfish::bios::builder(&redfish::bios::resource(system_id)).build(), ), diff --git a/crates/bmc-mock/src/hw/nvidia_dgx_h100.rs b/crates/bmc-mock/src/hw/nvidia_dgx_h100.rs index 45fc01b0f3..adae9d2263 100644 --- a/crates/bmc-mock/src/hw/nvidia_dgx_h100.rs +++ b/crates/bmc-mock/src/hw/nvidia_dgx_h100.rs @@ -185,6 +185,7 @@ impl NvidiaDgxH100<'_> { base_bios: Some(base_bios(system_id)), log_services: None, storage: None, + processors: None, secure_boot_available: true, }, redfish::computer_system::SingleSystemConfig { @@ -202,6 +203,7 @@ impl NvidiaDgxH100<'_> { base_bios: None, log_services: None, storage: None, + processors: None, secure_boot_available: false, }, ], diff --git a/crates/bmc-mock/src/hw/nvidia_switch_nd5200_ld.rs b/crates/bmc-mock/src/hw/nvidia_switch_nd5200_ld.rs index ddc5a77f89..c563dfde14 100644 --- a/crates/bmc-mock/src/hw/nvidia_switch_nd5200_ld.rs +++ b/crates/bmc-mock/src/hw/nvidia_switch_nd5200_ld.rs @@ -79,6 +79,7 @@ impl NvidiaSwitchNd5200Ld<'_> { oem: redfish::computer_system::Oem::Generic, log_services: None, storage: Some(vec![]), + processors: None, base_bios: None, secure_boot_available: false, }], diff --git a/crates/bmc-mock/src/hw/supermicro_gb300_nvl.rs b/crates/bmc-mock/src/hw/supermicro_gb300_nvl.rs index 6a81563da7..5ebea83e5e 100644 --- a/crates/bmc-mock/src/hw/supermicro_gb300_nvl.rs +++ b/crates/bmc-mock/src/hw/supermicro_gb300_nvl.rs @@ -173,6 +173,7 @@ impl SupermicroGB300Nvl<'_> { secure_boot_available: false, serial_number: Some(self.hgx_serial_number.to_string().into()), storage: None, + processors: None, }, redfish::computer_system::SingleSystemConfig { base_bios: Some(base_bios(system_id)), @@ -191,6 +192,7 @@ impl SupermicroGB300Nvl<'_> { secure_boot_available: true, serial_number: Some(self.system_0_serial_number.to_string().into()), storage: None, + processors: None, }, ], } diff --git a/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs b/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs index a15d14ef62..4d4b6c7323 100644 --- a/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs +++ b/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs @@ -117,6 +117,7 @@ impl WiwynnGB200Nvl<'_> { ), log_services: None, storage: None, + processors: None, secure_boot_available: true, }, redfish::computer_system::SingleSystemConfig { @@ -134,6 +135,17 @@ impl WiwynnGB200Nvl<'_> { base_bios: None, log_services: None, storage: None, + processors: Some( + (0..4) + .map(|n| { + redfish::processor::gpu( + "HGX_Baseboard_0", + &format!("GPU_{n}"), + &format!("/redfish/v1/Chassis/HGX_GPU_{n}/Sensors/Voltage_1"), + ) + }) + .collect(), + ), secure_boot_available: false, }, ], diff --git a/crates/bmc-mock/src/redfish/computer_system.rs b/crates/bmc-mock/src/redfish/computer_system.rs index a440508eb7..5440c4181d 100644 --- a/crates/bmc-mock/src/redfish/computer_system.rs +++ b/crates/bmc-mock/src/redfish/computer_system.rs @@ -64,6 +64,7 @@ pub fn add_routes(r: Router, bmc_vendor: redfish::oem::BmcVendor) -> R const ETH_ID: &str = "{eth_id}"; const BOOT_OPTION_ID: &str = "{boot_option_id}"; const LOG_SERVICE_ID: &str = "{log_service_id}"; + const PROCESSOR_ID: &str = "{processor_id}"; let bios = redfish::bios::resource(SYSTEM_ID); r.route(&collection().odata_id, get(get_system_collection)) .route( @@ -112,6 +113,18 @@ pub fn add_routes(r: Router, bmc_vendor: redfish::oem::BmcVendor) -> R &redfish::storage::system_collection(SYSTEM_ID).odata_id, get(get_storage_collection), ) + .route( + &redfish::processor::system_collection(SYSTEM_ID).odata_id, + get(get_processors_collection), + ) + .route( + &redfish::processor::system_resource(SYSTEM_ID, PROCESSOR_ID).odata_id, + get(get_processor), + ) + .route( + &redfish::processor::metrics_resource(SYSTEM_ID, PROCESSOR_ID).odata_id, + get(get_processor_metrics), + ) .route( &bmc_vendor.make_settings_odata_id(&bios), patch(patch_bios_settings), @@ -136,6 +149,7 @@ pub struct SingleSystemConfig { pub base_bios: Option, pub log_services: Option>, pub storage: Option>, + pub processors: Option>, pub secure_boot_available: bool, pub oem: Oem, } @@ -231,6 +245,14 @@ impl SingleSystemState { } } + fn find_processor(&self, processor_id: &str) -> Option<&redfish::processor::Processor> { + self.config + .processors + .iter() + .flatten() + .find(|processor| processor.id == processor_id) + } + pub fn find_boot_option(&self, option_id: &str) -> Option<&redfish::boot_option::BootOption> { self.config .boot_options @@ -369,6 +391,11 @@ async fn get_system(State(state): State, Path(system_id): Path .is_some() .then_some(redfish::storage::system_collection(&system_id)); + let processors = config + .processors + .is_some() + .then_some(redfish::processor::system_collection(&system_id)); + let secure_boot = config .secure_boot_available .then_some(redfish::secure_boot::resource(&system_id)); @@ -381,6 +408,7 @@ async fn get_system(State(state): State, Path(system_id): Path .maybe_with(SystemBuilder::ethernet_interfaces, ðernet_interfaces) .maybe_with(SystemBuilder::log_services, &log_services) .maybe_with(SystemBuilder::storage, &storage) + .maybe_with(SystemBuilder::processors, &processors) .maybe_with(SystemBuilder::secure_boot, &secure_boot) .pcie_devices(&pcie_devices) .build() @@ -719,6 +747,52 @@ async fn get_storage_collection( .unwrap_or_else(http::not_found) } +async fn get_processors_collection( + State(state): State, + Path(system_id): Path, +) -> Response { + state + .system_state + .find(&system_id) + .and_then(|system_state| system_state.config.processors.as_ref()) + .map(|processors| { + let members = processors + .iter() + .map(|processor| { + redfish::processor::system_resource(&system_id, &processor.id).entity_ref() + }) + .collect::>(); + redfish::processor::system_collection(&system_id) + .with_members(&members) + .into_ok_response() + }) + .unwrap_or_else(http::not_found) +} + +async fn get_processor( + State(state): State, + Path((system_id, processor_id)): Path<(String, String)>, +) -> Response { + state + .system_state + .find(&system_id) + .and_then(|system_state| system_state.find_processor(&processor_id)) + .map(|processor| processor.to_json().into_ok_response()) + .unwrap_or_else(http::not_found) +} + +async fn get_processor_metrics( + State(state): State, + Path((system_id, processor_id)): Path<(String, String)>, +) -> Response { + state + .system_state + .find(&system_id) + .and_then(|system_state| system_state.find_processor(&processor_id)) + .map(|processor| processor.metrics_json().into_ok_response()) + .unwrap_or_else(http::not_found) +} + async fn get_bios(State(state): State, Path(system_id): Path) -> Response { state .system_state @@ -865,6 +939,10 @@ impl SystemBuilder { self.apply_patch(storage.nav_property("Storage")) } + pub fn processors(self, processors: &redfish::Collection<'_>) -> Self { + self.apply_patch(processors.nav_property("Processors")) + } + pub fn link_chassis(self, ids: &[Cow<'static, str>]) -> Self { let chassis = ids .iter() diff --git a/crates/bmc-mock/src/redfish/mod.rs b/crates/bmc-mock/src/redfish/mod.rs index 70c19df62d..3a68d13924 100644 --- a/crates/bmc-mock/src/redfish/mod.rs +++ b/crates/bmc-mock/src/redfish/mod.rs @@ -34,6 +34,7 @@ pub mod oem; pub mod pcie_device; pub mod power_subsystem; pub mod power_supply; +pub mod processor; pub mod resource; pub mod secure_boot; pub mod sensor; diff --git a/crates/health/benches/collector_pipeline.rs b/crates/health/benches/collector_pipeline.rs index 5d28892a7d..682470e63d 100644 --- a/crates/health/benches/collector_pipeline.rs +++ b/crates/health/benches/collector_pipeline.rs @@ -25,7 +25,7 @@ use carbide_health::endpoint::{BmcAddr, EndpointMetadata, MachineData}; use carbide_health::metrics::MetricsManager; use carbide_health::sink::{ CollectorEvent, CompositeDataSink, DataSink, EventContext, FirmwareInfo, LogRecord, - PrometheusSink, SensorHealthData, + MetricSample, PrometheusSink, }; use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; use mac_address::MacAddress; @@ -73,7 +73,7 @@ fn build_sensor_metric_event(idx: usize, unique_keys: usize) -> CollectorEvent { let rack_idx = idx % 4; CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: sensor_key.clone(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), @@ -94,7 +94,7 @@ fn build_sensor_metric_event(idx: usize, unique_keys: usize) -> CollectorEvent { fn build_nmxt_metric_event(idx: usize) -> CollectorEvent { CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: format!("effective_ber:{}", idx % 64), name: "switch_nmxt".to_string(), metric_type: "effective_ber".to_string(), diff --git a/crates/health/benches/processor_pipeline.rs b/crates/health/benches/processor_pipeline.rs index dc0f34a72f..d005eac8df 100644 --- a/crates/health/benches/processor_pipeline.rs +++ b/crates/health/benches/processor_pipeline.rs @@ -27,8 +27,7 @@ use carbide_health::processor::{ RackLeakProcessor, }; use carbide_health::sink::{ - CollectorEvent, CompositeDataSink, DataSink, EventContext, SensorHealthContext, - SensorHealthData, + CollectorEvent, CompositeDataSink, DataSink, EventContext, MetricSample, SensorThresholdContext, }; use carbide_uuid::rack::RackId; use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; @@ -123,7 +122,7 @@ fn metric_events( let sensor_name = format!("sensor-{sensor_idx}"); let reading = 20.0 + ((idx % 90) as f64); - let mut metric = SensorHealthData { + let mut metric = MetricSample { key: sensor_name.clone(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), @@ -137,7 +136,7 @@ fn metric_events( }; if with_health_context { - metric.context = Some(SensorHealthContext { + metric.context = Some(SensorThresholdContext { entity_type: "sensor".to_string(), sensor_id: sensor_name, upper_fatal: Some(90.0), diff --git a/crates/health/benches/sink_pipeline.rs b/crates/health/benches/sink_pipeline.rs index 8e6b949aa6..2ece1db4eb 100644 --- a/crates/health/benches/sink_pipeline.rs +++ b/crates/health/benches/sink_pipeline.rs @@ -25,7 +25,7 @@ use carbide_health::endpoint::{BmcAddr, EndpointMetadata, MachineData}; use carbide_health::metrics::MetricsManager; use carbide_health::sink::{ Classification, CollectorEvent, CompositeDataSink, DataSink, EventContext, HealthReport, - HealthReportSink, LogRecord, PrometheusSink, ReportSource, SensorHealthData, + HealthReportSink, LogRecord, MetricSample, PrometheusSink, ReportSource, }; use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; use health_report::HealthReport as CarbideHealthReport; @@ -84,7 +84,7 @@ fn metric_events(batch_size: usize, unique_keys: usize) -> Vec { let key = format!("sensor-{sensor_idx}"); CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: key.clone(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), diff --git a/crates/health/example/config.example.toml b/crates/health/example/config.example.toml index 66d7807a25..459babc400 100644 --- a/crates/health/example/config.example.toml +++ b/crates/health/example/config.example.toml @@ -22,6 +22,8 @@ shards_count = 1 cache_size = 100 bmc_proxy_url = "http://proxy.example.com:8080" +endpoint_discovery_interval = "5m" + # ============================================================================== # Endpoint Sources: Where to discover BMC endpoints from # ============================================================================== @@ -131,13 +133,19 @@ max_jitter = "40ms" # BMC Collectors: What data to collect from BMCs # ============================================================================== +[collectors.discovery] +refresh_interval = "60m" +discovery_concurrency = 1 + [collectors.sensors] -rediscover_interval = "5m" -state_refresh_interval = "30m" sensor_fetch_interval = "1m" sensor_fetch_concurrency = 10 include_sensor_thresholds = true +[collectors.metrics] +fetch_interval = "2m" +fetch_concurrency = 4 + [collectors.firmware] firmware_refresh_interval = "30m" diff --git a/crates/health/src/collectors/mod.rs b/crates/health/src/collectors/mod.rs index ecffec2bdd..6499644edf 100644 --- a/crates/health/src/collectors/mod.rs +++ b/crates/health/src/collectors/mod.rs @@ -15,7 +15,10 @@ * limitations under the License. */ +mod discovery; +mod entity_metrics; mod firmware; +pub(crate) mod inventory; mod leak_detector; mod logs; mod nmxt; @@ -23,7 +26,10 @@ mod nvue; mod runtime; mod sensors; +pub use discovery::{EntityDiscoveryCollector, EntityDiscoveryCollectorConfig}; +pub use entity_metrics::{MetricsCollector, MetricsCollectorConfig}; pub use firmware::{FirmwareCollector, FirmwareCollectorConfig}; +pub(crate) use inventory::SharedInventory; pub use leak_detector::{LeakDetectorCollector, LeakDetectorCollectorConfig}; pub(crate) use logs::auto::{AutoFailureBudget, BudgetDecision, FailureKind}; pub use logs::{ diff --git a/crates/health/src/collectors/nmxt.rs b/crates/health/src/collectors/nmxt.rs index 5237aa3585..7f762a2ed8 100644 --- a/crates/health/src/collectors/nmxt.rs +++ b/crates/health/src/collectors/nmxt.rs @@ -29,7 +29,7 @@ use crate::HealthError; use crate::collectors::{IterationResult, PeriodicCollector}; use crate::config::NmxtCollectorConfig as NmxtCollectorOptions; use crate::endpoint::{BmcEndpoint, EndpointMetadata}; -use crate::sink::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample}; /// default NMX-T port const NMXT_PORT: u16 = 9352; @@ -243,7 +243,7 @@ impl NmxtCollector { ]; self.emit_event(CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: metric_key, name: "switch_nmxt".to_string(), metric_type: metric_type.to_string(), diff --git a/crates/health/src/collectors/nvue/gnmi/on_change_processor.rs b/crates/health/src/collectors/nvue/gnmi/on_change_processor.rs index 5684413c7e..7aebb6b8f5 100644 --- a/crates/health/src/collectors/nvue/gnmi/on_change_processor.rs +++ b/crates/health/src/collectors/nvue/gnmi/on_change_processor.rs @@ -27,7 +27,7 @@ use super::proto::{self, PathElem}; use super::sample_processor::now_unix_secs; use super::subscriber::GnmiStreamMetrics; use crate::HealthError; -use crate::sink::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample}; type ParsedRow = HashMap; type CachedRows = HashMap; @@ -276,7 +276,7 @@ impl GnmiOnChangeProcessor { sink.handle_event( &self.event_context, - &CollectorEvent::Metric(Box::new(SensorHealthData { + &CollectorEvent::Metric(Box::new(MetricSample { key, name: self.collector_name.clone(), metric_type: "on_change_row".to_string(), @@ -437,7 +437,7 @@ mod tests { } } - fn metric_label<'a>(metric: &'a SensorHealthData, label: &str) -> Option<&'a str> { + fn metric_label<'a>(metric: &'a MetricSample, label: &str) -> Option<&'a str> { metric .labels .iter() diff --git a/crates/health/src/collectors/nvue/gnmi/sample_processor.rs b/crates/health/src/collectors/nvue/gnmi/sample_processor.rs index e474ad3aaa..20c06e3854 100644 --- a/crates/health/src/collectors/nvue/gnmi/sample_processor.rs +++ b/crates/health/src/collectors/nvue/gnmi/sample_processor.rs @@ -23,7 +23,7 @@ use std::time::Instant; use super::client::{typed_value_to_f64, typed_value_to_string}; use super::proto::{self, PathElem}; use super::subscriber::GnmiStreamMetrics; -use crate::sink::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample}; pub(crate) const NVUE_GNMI_SAMPLE_STREAM_ID: &str = "nvue_gnmi"; @@ -235,7 +235,7 @@ impl GnmiSampleProcessor { sink.handle_event( &self.event_context, - &CollectorEvent::Metric(Box::new(SensorHealthData { + &CollectorEvent::Metric(Box::new(MetricSample { key, name: NVUE_GNMI_SAMPLE_STREAM_ID.to_string(), metric_type: metric_type.to_string(), diff --git a/crates/health/src/collectors/nvue/rest/collector.rs b/crates/health/src/collectors/nvue/rest/collector.rs index efb4e2a243..2165a5f9d2 100644 --- a/crates/health/src/collectors/nvue/rest/collector.rs +++ b/crates/health/src/collectors/nvue/rest/collector.rs @@ -24,7 +24,7 @@ use crate::bmc::{CREDENTIAL_REFRESH_TIMEOUT, CredentialProvider, is_auth_error}; use crate::collectors::{IterationResult, PeriodicCollector}; use crate::config::NvueRestConfig; use crate::endpoint::{BmcAddr, BmcCredentials, BmcEndpoint, EndpointMetadata}; -use crate::sink::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample}; const COLLECTOR_NAME: &str = "nvue_rest"; @@ -329,7 +329,7 @@ impl NvueRestCollector { }; self.emit_event(CollectorEvent::Metric( - SensorHealthData { + MetricSample { key, name: COLLECTOR_NAME.to_string(), metric_type: metric_type.to_string(), diff --git a/crates/health/src/collectors/sensors.rs b/crates/health/src/collectors/sensors.rs index 746a751360..d05275d05c 100644 --- a/crates/health/src/collectors/sensors.rs +++ b/crates/health/src/collectors/sensors.rs @@ -16,29 +16,24 @@ */ use std::borrow::Cow; -use std::collections::HashSet; -use std::convert::identity; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; use futures::{StreamExt, stream}; -use nv_redfish::chassis::{Chassis, PowerSupply}; -use nv_redfish::computer_system::{ComputerSystem, Drive, Memory, Processor, Storage}; use nv_redfish::core::{Bmc, EntityTypeRef, ToSnakeCase}; use nv_redfish::sensor::SensorLink; -use nv_redfish::{Resource, ServiceRoot}; use crate::HealthError; -use crate::collectors::{IterationResult, PeriodicCollector}; -use crate::endpoint::{BmcAddr, BmcEndpoint}; -use crate::metrics::{MetricLabel, sanitize_unit}; -use crate::sink::{CollectorEvent, DataSink, EventContext, SensorHealthContext, SensorHealthData}; - -/// Configuration for sensor collector -pub struct SensorCollectorConfig { +use crate::collectors::inventory::{DiscoveredEntity, SharedInventory}; +use crate::collectors::runtime::{IterationResult, PeriodicCollector}; +use crate::endpoint::BmcEndpoint; +use crate::metrics::sanitize_unit; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample, SensorThresholdContext}; + +/// Configuration for the sensor collector. +pub struct SensorCollectorConfig { pub data_sink: Option>, - pub state_refresh_interval: Duration, + pub(crate) shared: SharedInventory, pub sensor_fetch_concurrency: usize, pub include_sensor_thresholds: bool, } @@ -46,257 +41,103 @@ pub struct SensorCollectorConfig { /// Sensor collector for a single BMC endpoint pub struct SensorCollector { endpoint: Arc, - bmc: Arc, event_context: EventContext, - state: Option>, + shared: SharedInventory, data_sink: Option>, - state_refresh_interval: Duration, sensor_fetch_concurrency: usize, include_sensor_thresholds: bool, } impl PeriodicCollector for SensorCollector { - type Config = SensorCollectorConfig; + type Config = SensorCollectorConfig; fn new_runner( - bmc: Arc, + _bmc: Arc, endpoint: Arc, config: Self::Config, ) -> Result { let event_context = EventContext::from_endpoint(endpoint.as_ref(), "sensor_collector"); Ok(Self { - bmc, endpoint, event_context, - state: None, + shared: config.shared, data_sink: config.data_sink, - state_refresh_interval: config.state_refresh_interval, - sensor_fetch_concurrency: config.sensor_fetch_concurrency, + sensor_fetch_concurrency: config.sensor_fetch_concurrency.max(1), include_sensor_thresholds: config.include_sensor_thresholds, }) } async fn run_iteration(&mut self) -> Result { - self.run_monitor_iteration().await - } - - fn collector_type(&self) -> &'static str { - "sensor_collector" - } - - async fn stop(&mut self) { - self.emit_event(CollectorEvent::CollectorRemoved); - } -} - -/// Monitored entity with its associated sensors -enum MonitoredEntity { - Processor { - entity: Arc>, - sensor: SensorLink, - system: Arc>, - }, - Memory { - entity: Arc>, - sensor: SensorLink, - system: Arc>, - }, - Drive { - entity: Arc>, - sensor: SensorLink, - system: Arc>, - storage: Arc>, - }, - PowerSupply { - entity: Arc>, - sensor: SensorLink, - chassis: Arc>, - }, - Chassis { - entity: Arc>, - sensor: SensorLink, - }, -} + let Some(inventory) = self.shared.load_full() else { + tracing::debug!( + bmc_addr = ?self.endpoint.addr, + "No entity inventory available yet; skipping sensor iteration" + ); + return Ok(IterationResult { + refresh_triggered: false, + entity_count: None, + fetch_failures: 0, + }); + }; -/// Trait for entities that can record sensor metrics -trait SensorRecordable { - fn metric_prefix(&self) -> &'static str; - fn sensor(&self) -> &SensorLink; - fn base_attributes(&self) -> Vec; - fn entity_specific_attributes(&self) -> Vec; - fn entity_metrics(&self, attributes: &[MetricLabel]) -> Vec; -} + tracing::debug!( + bmc_addr = ?self.endpoint.addr, + generation = inventory.generation, + inventory_age_secs = inventory.discovered_at.elapsed().as_secs(), + entity_count = inventory.entities.len(), + "Reading entity inventory snapshot for sensor iteration" + ); -impl SensorRecordable for MonitoredEntity { - fn metric_prefix(&self) -> &'static str { - "hw_sensor" - } + let fetch_failures = AtomicUsize::new(0); + self.emit_event(CollectorEvent::MetricCollectionStart); - fn sensor(&self) -> &SensorLink { - match self { - MonitoredEntity::Processor { sensor, .. } - | MonitoredEntity::Memory { sensor, .. } - | MonitoredEntity::Drive { sensor, .. } - | MonitoredEntity::PowerSupply { sensor, .. } - | MonitoredEntity::Chassis { sensor, .. } => sensor, + // Entity-level derived metrics (drive media life, PSU capacity), once + // per entity. + for entity in &inventory.entities { + self.emit_derived_metrics(entity); } - } - fn base_attributes(&self) -> Vec { - match self { - MonitoredEntity::Processor { entity, system, .. } => vec![ - (Cow::Borrowed("processor_id"), entity.raw().base.id.clone()), - (Cow::Borrowed("system_id"), system.raw().base.id.clone()), - ], - MonitoredEntity::Memory { entity, system, .. } => vec![ - (Cow::Borrowed("memory_id"), entity.raw().base.id.clone()), - (Cow::Borrowed("system_id"), system.raw().base.id.clone()), - ], - MonitoredEntity::Drive { - entity, - system, - storage, - .. - } => vec![ - (Cow::Borrowed("drive_id"), entity.raw().base.id.clone()), - (Cow::Borrowed("storage_id"), storage.raw().base.id.clone()), - (Cow::Borrowed("system_id"), system.raw().base.id.clone()), - ], - MonitoredEntity::PowerSupply { - entity, chassis, .. - } => vec![ - ( - Cow::Borrowed("powersupply_id"), - entity.raw().base.id.clone(), - ), - (Cow::Borrowed("chassis_id"), chassis.raw().base.id.clone()), - ], - MonitoredEntity::Chassis { entity, .. } => { - vec![(Cow::Borrowed("chassis_id"), entity.raw().base.id.clone())] - } - } - } + // Build the fetch futures borrowing from the shared snapshot, then + // drive them concurrently. Each future borrows `&self`, the entity, and + // its sensor (all alive for as long as `inventory` is held here). + let this = &*self; + let failures = &fetch_failures; + let futures: Vec<_> = inventory + .entities + .iter() + .flat_map(|entity| { + entity + .sensors() + .iter() + .map(move |sensor| this.update_sensor(entity, sensor, failures)) + }) + .collect(); - fn entity_specific_attributes(&self) -> Vec { - let mut attrs = Vec::new(); + let processed: usize = stream::iter(futures) + .buffer_unordered(self.sensor_fetch_concurrency) + .collect::>() + .await + .into_iter() + .sum(); - match self { - MonitoredEntity::Processor { entity, .. } => { - if let Some(processor_type) = entity.raw().processor_type.flatten() { - attrs.push(( - Cow::Borrowed("processor_type"), - processor_type.to_snake_case().to_string(), - )); - } - if let Some(model) = entity.raw().model.clone().flatten() { - attrs.push((Cow::Borrowed("model"), model)); - } - } - MonitoredEntity::Memory { entity, .. } => { - if let Some(device_type) = entity.raw().memory_device_type.flatten() { - attrs.push(( - Cow::Borrowed("device_type"), - device_type.to_snake_case().to_string(), - )); - } - if let Some(model) = entity.raw().model.clone().flatten() { - attrs.push((Cow::Borrowed("model"), model)); - } - } - MonitoredEntity::Drive { entity, .. } => { - if let Some(model) = entity.raw().model.clone().flatten() { - attrs.push((Cow::Borrowed("model"), model)); - } - } - MonitoredEntity::PowerSupply { entity, .. } => { - if let Some(model) = entity.raw().model.clone().flatten() { - attrs.push((Cow::Borrowed("model"), model)); - } - } - MonitoredEntity::Chassis { entity, .. } => { - if let Some(model) = entity.raw().model.clone().flatten() { - attrs.push((Cow::Borrowed("model"), model)); - } - } - } + self.emit_event(CollectorEvent::MetricCollectionEnd); - attrs + Ok(IterationResult { + refresh_triggered: false, + entity_count: Some(processed), + fetch_failures: fetch_failures.load(Ordering::Relaxed), + }) } - fn entity_metrics(&self, attributes: &[MetricLabel]) -> Vec { - match self { - MonitoredEntity::Drive { entity, .. } => { - if let Some(lifetime) = entity.raw().predicted_media_life_left_percent.flatten() { - vec![SensorHealthData { - key: entity.odata_id().to_string(), - name: "hw".to_string(), - metric_type: "drive_predicted_media_life_left".to_string(), - unit: "percentage".to_string(), - value: lifetime, - labels: attributes.to_vec(), - context: None, - }] - } else { - Vec::new() - } - } - MonitoredEntity::PowerSupply { entity, .. } => { - if let Some(capacity) = entity.raw().power_capacity_watts.flatten() { - vec![SensorHealthData { - key: entity.odata_id().to_string(), - name: "hw".to_string(), - metric_type: "powersupply_capacity".to_string(), - unit: "watts".to_string(), - value: capacity, - labels: attributes.to_vec(), - context: None, - }] - } else { - Vec::new() - } - } - _ => Vec::new(), - } + fn collector_type(&self) -> &'static str { + "sensor_collector" } -} -trait ResultExt { - fn log_and_ok( - self, - context: &str, - bmc_addr: &BmcAddr, - fetch_failures: &AtomicUsize, - ) -> Option - where - E: std::fmt::Debug; -} - -impl ResultExt for Result { - fn log_and_ok( - self, - context: &str, - bmc_addr: &BmcAddr, - fetch_failures: &AtomicUsize, - ) -> Option - where - E: std::fmt::Debug, - { - match self { - Ok(val) => Some(val), - Err(e) => { - fetch_failures.fetch_add(1, Ordering::Relaxed); - tracing::warn!(error = ?e, context, bmc_addr=?bmc_addr, "Operation failed"); - None - } - } + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); } } -struct SensorCollectorState { - entities: Vec>, - last_entity_refresh: Instant, -} - impl SensorCollector { fn emit_event(&self, event: CollectorEvent) { if let Some(data_sink) = &self.data_sink { @@ -304,401 +145,42 @@ impl SensorCollector { } } - async fn run_monitor_iteration(&mut self) -> Result { - let needs_entity_refresh = self - .state - .as_ref() - .map(|s| s.last_entity_refresh.elapsed() > self.state_refresh_interval) - .unwrap_or(true); - - let mut refresh_triggered = false; - let mut entity_count = None; - let fetch_failures = AtomicUsize::new(0); - - if needs_entity_refresh { - tracing::info!("Refreshing entity state for BMC: {}", self.endpoint.addr.ip); - match self.discover_entities(&fetch_failures).await { - Ok(entities) => { - let count = entities.len(); - tracing::info!("Entity refresh complete. Found {} entities", count); - - self.state = Some(SensorCollectorState { - entities, - last_entity_refresh: Instant::now(), - }); - refresh_triggered = true; - } - Err(e) => { - tracing::error!(error=?e, "Failed to discover entities"); - if self.state.is_none() { - return Err(e); - } - // Keep using old state if discovery fails - } - } - } - - if let Some(state) = &self.state { - let processed_sensors = self - .fetch_and_update_sensors(state, &fetch_failures) - .await?; - entity_count = Some(processed_sensors); + fn emit_derived_metrics(&self, entity: &DiscoveredEntity) { + let derived = entity.derived_metrics(); + if derived.is_empty() { + return; } - - Ok(IterationResult { - refresh_triggered, - entity_count, - fetch_failures: fetch_failures.load(Ordering::Relaxed), - }) - } - - async fn discover_processor_entities( - &self, - system: Arc>, - fetch_failures: &AtomicUsize, - ) -> Vec> { - let processors = system - .processors() - .await - .log_and_ok( - "Failed to get processors", - &self.endpoint.addr, - fetch_failures, - ) - .and_then(identity) - .unwrap_or_default(); - - stream::iter(processors) - .then(|processor| async move { - let processor = Arc::new(processor); - let env_sensors = processor - .environment_sensor_links() - .await - .log_and_ok( - "Failed to get processors enviroment sensors", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - let metric_sensors = processor - .metrics_sensor_links() - .await - .log_and_ok( - "Failed to get processors metric sensors", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - (processor, env_sensors.into_iter().chain(metric_sensors)) - }) - .flat_map(|(processor, sensors)| { - let system = system.clone(); - stream::iter(sensors.map(move |sensor| MonitoredEntity::Processor { - entity: processor.clone(), - sensor, - system: system.clone(), - })) - }) - .collect() - .await - } - - async fn discover_memory_entities( - &self, - system: Arc>, - fetch_failures: &AtomicUsize, - ) -> Vec> { - let memory_modules = system - .memory_modules() - .await - .log_and_ok( - "Failed to get memory modules", - &self.endpoint.addr, - fetch_failures, - ) - .and_then(identity) - .unwrap_or_default(); - - stream::iter(memory_modules) - .then(|memory| async move { - let memory = Arc::new(memory); - let env_sensors = memory - .environment_sensor_links() - .await - .log_and_ok( - "Failed to get memory enviroment sensors", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - (memory, env_sensors.into_iter()) - }) - .flat_map(|(memory, sensors)| { - let system = system.clone(); - stream::iter(sensors.map(move |sensor| MonitoredEntity::Memory { - entity: memory.clone(), - sensor, - system: system.clone(), - })) - }) - .collect() - .await - } - - async fn discover_drive_entities( - &self, - system: Arc>, - fetch_failures: &AtomicUsize, - ) -> Vec> { - let storage_list = system - .storage_controllers() - .await - .log_and_ok("Failed to get storage", &self.endpoint.addr, fetch_failures) - .and_then(identity) - .unwrap_or_default(); - - stream::iter(storage_list) - .then(|storage| async move { - let storage = Arc::new(storage); - let drives = storage - .drives() - .await - .log_and_ok("Failed to get drives", &self.endpoint.addr, fetch_failures) - .and_then(identity) - .unwrap_or_default(); - (storage, drives) - }) - .flat_map(|(storage, drives)| { - let system = system.clone(); - stream::iter(drives).then(move |drive| { - let storage = storage.clone(); - let system = system.clone(); - async move { - let drive = Arc::new(drive); - let env_sensors = drive - .environment_sensor_links() - .await - .log_and_ok( - "Failed to get drives enviroment sensors", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - (drive, storage, system, env_sensors.into_iter()) - } - }) - }) - .flat_map(|(drive, storage, system, sensors)| { - stream::iter(sensors.map(move |sensor| MonitoredEntity::Drive { - entity: drive.clone(), - sensor, - system: system.clone(), - storage: storage.clone(), - })) - }) - .collect() - .await - } - - async fn discover_power_supply_entities( - &self, - chassis: Arc>, - fetch_failures: &AtomicUsize, - ) -> Vec> { - let power_supplies = chassis - .power_supplies() - .await - .log_and_ok( - "Failed to get power supplies", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - - stream::iter(power_supplies) - .then(|ps| async move { - let ps = Arc::new(ps); - let metric_sensors = ps - .metrics_sensor_links() - .await - .log_and_ok( - "Failed to get power supplies metrics sensors", - &self.endpoint.addr, - fetch_failures, - ) - .unwrap_or_default(); - (ps, metric_sensors.into_iter()) - }) - .flat_map(|(ps, sensors)| { - let chassis = chassis.clone(); - stream::iter(sensors.map(move |sensor| MonitoredEntity::PowerSupply { - entity: ps.clone(), - sensor, - chassis: chassis.clone(), - })) - }) - .collect() - .await - } - - async fn discover_chassis_entities( - &self, - chassis: Arc>, - fetch_failures: &AtomicUsize, - ) -> Vec> { - match chassis.sensor_links().await { - Ok(Some(sensors)) => sensors - .into_iter() - .map(move |sensor| MonitoredEntity::Chassis { - entity: chassis.clone(), - sensor, - }) - .collect(), - Ok(None) => Vec::new(), - Err(error) => { - fetch_failures.fetch_add(1, Ordering::Relaxed); - tracing::warn!(error = ?error, bmc_addr=?self.endpoint.addr, "Failed to get chassis sensors"); - Vec::new() - } - } - } - - async fn discover_entities( - &self, - fetch_failures: &AtomicUsize, - ) -> Result>, HealthError> { - let service_root = ServiceRoot::new(self.bmc.clone()).await?; - - let mut entities = Vec::new(); - let mut sensor_ids = HashSet::new(); - - if let Some(systems) = service_root.systems().await? { - for system in systems.members().await? { - let system = Arc::new(system); - - for entity in self - .discover_processor_entities(system.clone(), fetch_failures) - .await - { - sensor_ids.insert(entity.sensor().odata_id().clone()); - entities.push(entity); - } - - for entity in self - .discover_memory_entities(system.clone(), fetch_failures) - .await - { - sensor_ids.insert(entity.sensor().odata_id().clone()); - entities.push(entity); - } - - for entity in self.discover_drive_entities(system, fetch_failures).await { - sensor_ids.insert(entity.sensor().odata_id().clone()); - entities.push(entity); - } - } - } - - if let Some(chassis_list) = service_root.chassis().await? { - for chassis in chassis_list.members().await? { - let chassis = Arc::new(chassis); - - for entity in self - .discover_power_supply_entities(chassis.clone(), fetch_failures) - .await - { - sensor_ids.insert(entity.sensor().odata_id().clone()); - entities.push(entity); - } - - for entity in self - .discover_chassis_entities(chassis, fetch_failures) - .await - { - // Only add not discovered sensors - if sensor_ids.insert(entity.sensor().odata_id().clone()) { - entities.push(entity); - } - } - } - } - - let validation_results: Vec<_> = stream::iter(entities) - .map(|entity| async move { - match entity.sensor().fetch().await { - Ok(sensor_data) => { - let is_valid = matches!( - ( - sensor_data.reading.flatten(), - sensor_data.reading_type.flatten(), - sensor_data.reading_units.as_ref().and_then(|u| u.as_ref()), - ), - (Some(_), Some(_), Some(units)) if !units.is_empty() - ); - (entity, is_valid) - } - // We will treat http errors as transient, and assume sensor is valid - Err(e) => { - fetch_failures.fetch_add(1, Ordering::Relaxed); - tracing::warn!(error = ?e, bmc_addr=?self.endpoint.addr, - "Could not get sensor data for validation, assuming sensor is valid"); - (entity, true) - } + let mut attributes = entity.base_attributes(); + attributes.extend(entity.entity_specific_attributes()); + for metric in derived { + self.emit_event(CollectorEvent::Metric( + MetricSample { + key: format!("{}/{}", entity.key(), metric.metric_type), + name: "hw".to_string(), + metric_type: metric.metric_type.to_string(), + unit: metric.unit.to_string(), + value: metric.value, + labels: attributes.clone(), + context: None, } - }) - .buffer_unordered(self.sensor_fetch_concurrency) - .collect() - .await; - - let mut validated_entities = Vec::new(); - for (entity, is_valid) in validation_results { - if is_valid { - validated_entities.push(entity); - } + .into(), + )); } - - tracing::info!( - bmc = %self.endpoint.addr.mac, - total_valid = validated_entities.len(), - "Discovered hardware entities with sensors" - ); - - Ok(validated_entities) - } - - async fn fetch_and_update_sensors( - &self, - state: &SensorCollectorState, - fetch_failures: &AtomicUsize, - ) -> Result { - self.emit_event(CollectorEvent::MetricCollectionStart); - let futures: Vec<_> = state - .entities - .iter() - .map(|entity| self.update_sensor(entity, fetch_failures)) - .collect(); - - let processed: Vec<_> = stream::iter(futures) - .buffer_unordered(self.sensor_fetch_concurrency) - .collect() - .await; - self.emit_event(CollectorEvent::MetricCollectionEnd); - - Ok(processed.into_iter().sum()) } async fn update_sensor( &self, - entity: &MonitoredEntity, + entity: &DiscoveredEntity, + sensor_link: &SensorLink, fetch_failures: &AtomicUsize, ) -> usize { - let sensor = match entity.sensor().fetch().await { + let sensor = match sensor_link.fetch().await { Ok(s) => s, Err(e) => { fetch_failures.fetch_add(1, Ordering::Relaxed); tracing::warn!( - sensor_id = %entity.sensor().odata_id(), - entity_type = entity.metric_prefix(), + sensor_id = %sensor_link.odata_id(), + entity_type = entity.entity_type(), error = ?e, "Failed to fetch sensor data" ); @@ -713,8 +195,7 @@ impl SensorCollector { else { tracing::debug!( sensor_id = %sensor.base.id, - entity_type = entity.metric_prefix(), - sensor = ?sensor, + entity_type = entity.entity_type(), "Sensor does not have health status field, skipping" ); return 0; @@ -730,8 +211,7 @@ impl SensorCollector { else { tracing::warn!( sensor_id = %sensor.base.id, - entity_type = entity.metric_prefix(), - sensor = ?sensor, + entity_type = entity.entity_type(), "Sensor missing required fields (reading, reading_type, or units)" ); return 0; @@ -770,16 +250,7 @@ impl SensorCollector { .physical_context .flatten() .map(|phc| phc.to_snake_case().to_string()) - .unwrap_or_else(|| { - match entity { - MonitoredEntity::Processor { .. } => "cpu", - MonitoredEntity::Memory { .. } => "memory", - MonitoredEntity::Drive { .. } => "storage_device", - MonitoredEntity::PowerSupply { .. } => "power_supply", - MonitoredEntity::Chassis { .. } => "chassis", - } - .to_string() - }); + .unwrap_or_else(|| entity.physical_context_fallback().to_string()); attributes.push((Cow::Borrowed("physical_context"), physical_context)); attributes.extend(entity.entity_specific_attributes()); @@ -824,18 +295,16 @@ impl SensorCollector { (None, None, None, None, None, None) }; - let derived_metrics = entity.entity_metrics(&attributes); - self.emit_event(CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: sensor.odata_id().to_string(), name: "hw_sensor".to_string(), metric_type, unit, value: reading, labels: attributes, - context: Some(SensorHealthContext { - entity_type: entity.metric_prefix().replace("hw_", ""), + context: Some(SensorThresholdContext { + entity_type: entity.entity_type().to_string(), sensor_id: sensor.base.id.clone(), upper_fatal, lower_fatal, @@ -851,10 +320,6 @@ impl SensorCollector { .into(), )); - for metric in derived_metrics { - self.emit_event(CollectorEvent::Metric(metric.into())); - } - 1 } } diff --git a/crates/health/src/config.rs b/crates/health/src/config.rs index d63c7eb217..81f1183d28 100644 --- a/crates/health/src/config.rs +++ b/crates/health/src/config.rs @@ -49,6 +49,10 @@ pub struct Config { /// Maximum cache size per BMC, uses etags pub cache_size: usize, + /// Interval between BMC endpoint discovery iterations. + #[serde(with = "humantime_serde")] + pub endpoint_discovery_interval: Duration, + /// BMC proxy URL pub bmc_proxy_url: Option, } @@ -65,6 +69,7 @@ impl Default for Config { shard: 0, shards_count: 1, cache_size: 100, + endpoint_discovery_interval: Duration::from_secs(300), bmc_proxy_url: None, } } @@ -437,10 +442,16 @@ pub struct RateLimitConfig { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct CollectorsConfig { + /// Entity discovery configuration + pub discovery: DiscoveryConfig, + /// Sensor collector configuration (if present, sensor collector is enabled) #[serde(alias = "health")] pub sensors: Configurable, + /// Entity metrics collector configuration (if present, metrics collector is enabled) + pub metrics: Configurable, + /// Firmware collector configuration (if present, firmware collector is enabled) pub firmware: Configurable, @@ -460,7 +471,9 @@ pub struct CollectorsConfig { impl Default for CollectorsConfig { fn default() -> Self { Self { + discovery: DiscoveryConfig::default(), sensors: Configurable::Enabled(SensorCollectorConfig::default()), + metrics: Configurable::Disabled, firmware: Configurable::Disabled, leak_detector: Configurable::Enabled(LeakDetectorCollectorConfig::default()), logs: Configurable::Disabled, @@ -470,6 +483,42 @@ impl Default for CollectorsConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct DiscoveryConfig { + #[serde(with = "humantime_serde")] + pub refresh_interval: Duration, + + pub discovery_concurrency: usize, +} + +impl Default for DiscoveryConfig { + fn default() -> Self { + Self { + refresh_interval: Duration::from_secs(300), + discovery_concurrency: 4, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct MetricsCollectorConfig { + #[serde(with = "humantime_serde")] + pub fetch_interval: Duration, + + pub fetch_concurrency: usize, +} + +impl Default for MetricsCollectorConfig { + fn default() -> Self { + Self { + fetch_interval: Duration::from_secs(120), + fetch_concurrency: 4, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct ProcessorsConfig { @@ -523,14 +572,6 @@ impl Default for RackLeakProcessorConfig { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct SensorCollectorConfig { - /// Interval between BMC endpoint rediscovery. - #[serde(with = "humantime_serde")] - pub rediscover_interval: Duration, - - /// Interval between entity state refresh. - #[serde(with = "humantime_serde")] - pub state_refresh_interval: Duration, - /// Interval between sensor fetch iterations. #[serde(with = "humantime_serde")] pub sensor_fetch_interval: Duration, @@ -545,8 +586,6 @@ pub struct SensorCollectorConfig { impl Default for SensorCollectorConfig { fn default() -> Self { Self { - rediscover_interval: Duration::from_secs(300), - state_refresh_interval: Duration::from_secs(9000), sensor_fetch_interval: Duration::from_secs(60), sensor_fetch_concurrency: 4, include_sensor_thresholds: true, @@ -1006,6 +1045,10 @@ impl Config { )); } + if self.endpoint_discovery_interval.is_zero() { + return Err("endpoint_discovery_interval must be greater than 0".to_string()); + } + if let Configurable::Enabled(rate_limit) = &self.rate_limit && rate_limit.bucket_replenish.is_zero() { @@ -1188,7 +1231,6 @@ mod tests { assert!(config.sinks.prometheus.is_enabled()); if let Configurable::Enabled(ref sensors) = config.collectors.sensors { - assert_eq!(sensors.rediscover_interval, Duration::from_secs(300)); assert_eq!(sensors.sensor_fetch_concurrency, 10); } else { panic!("sensors empty") @@ -1238,6 +1280,7 @@ mod tests { assert_eq!(config.shards_count, 1); assert_eq!(config.cache_size, 100); + assert_eq!(config.endpoint_discovery_interval, Duration::from_secs(300)); if let Configurable::Enabled(ref nvue) = config.collectors.nvue { if let Configurable::Enabled(ref rest) = nvue.rest { @@ -1262,6 +1305,8 @@ mod tests { #[test] fn test_static_only_config() { let toml_content = r#" +endpoint_discovery_interval = "1m" + [[endpoint_sources.static_bmc_endpoints]] ip = "192.168.1.100" mac = "00:11:22:33:44:55" @@ -1275,9 +1320,7 @@ enabled = false enabled = false [collectors.sensors] -rediscover_interval = "1m" sensor_fetch_interval = "30s" -state_refresh_interval = "10m" sensor_fetch_concurrency = 5 include_sensor_thresholds = false @@ -1309,6 +1352,7 @@ cache_size = 50 ); assert_eq!(config.metrics.prefix, "carbide_hardware_new_health"); + assert_eq!(config.endpoint_discovery_interval, Duration::from_secs(60)); if let Configurable::Enabled(ref rate_limit) = config.rate_limit { assert_eq!(rate_limit.bucket_replenish, Duration::from_millis(30)); @@ -1320,7 +1364,6 @@ cache_size = 50 assert!(config.collectors.sensors.is_enabled()); if let Configurable::Enabled(ref sensors) = config.collectors.sensors { - assert_eq!(sensors.rediscover_interval, Duration::from_secs(60)); assert_eq!(sensors.sensor_fetch_interval, Duration::from_secs(30)); assert!(!sensors.include_sensor_thresholds); } else { @@ -1349,6 +1392,11 @@ cache_size = 50 config.shards_count = 1; assert!(config.validate().is_ok()); + config.endpoint_discovery_interval = Duration::from_secs(0); + assert!(config.validate().is_err()); + config.endpoint_discovery_interval = Duration::from_secs(300); + assert!(config.validate().is_ok()); + config.rate_limit = Configurable::Enabled(RateLimitConfig { bucket_burst: 200, bucket_replenish: Duration::from_secs(0), diff --git a/crates/health/src/discovery/cleanup.rs b/crates/health/src/discovery/cleanup.rs index e5a2c04ca6..5dba8d0728 100644 --- a/crates/health/src/discovery/cleanup.rs +++ b/crates/health/src/discovery/cleanup.rs @@ -49,6 +49,9 @@ pub(super) fn stop_removed_bmc_collectors( for kind in CollectorKind::ALL { stop_collectors_for_keys(ctx, kind, &removed_keys); } + for key in &removed_keys { + ctx.collectors.remove_inventory(key); + } if !removed_keys.is_empty() { tracing::info!( diff --git a/crates/health/src/discovery/context.rs b/crates/health/src/discovery/context.rs index 6f57234ef1..9a1948d27d 100644 --- a/crates/health/src/discovery/context.rs +++ b/crates/health/src/discovery/context.rs @@ -19,22 +19,27 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use arc_swap::ArcSwapOption; use prometheus::{Histogram, HistogramOpts}; use crate::HealthError; -use crate::collectors::{Collector, LogDowngradeRegistry}; +use crate::bmc::BmcClient; +use crate::collectors::{Collector, LogDowngradeRegistry, SharedInventory}; use crate::config::{ - Config, Configurable, FirmwareCollectorConfig as FirmwareCollectorOptions, + Config, Configurable, DiscoveryConfig, FirmwareCollectorConfig as FirmwareCollectorOptions, LeakDetectorCollectorConfig as LeakDetectorCollectorOptions, - LogsCollectorConfig as LogsCollectorOptions, NmxtCollectorConfig as NmxtCollectorOptions, - NvueCollectorConfig as NvueCollectorOptions, SensorCollectorConfig as SensorCollectorOptions, + LogsCollectorConfig as LogsCollectorOptions, MetricsCollectorConfig as MetricsCollectorOptions, + NmxtCollectorConfig as NmxtCollectorOptions, NvueCollectorConfig as NvueCollectorOptions, + SensorCollectorConfig as SensorCollectorOptions, }; use crate::limiter::RateLimiter; use crate::metrics::{MetricsManager, operation_duration_buckets_seconds}; #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] pub(super) enum CollectorKind { + Discovery, Sensor, + Metrics, Logs, Firmware, LeakDetector, @@ -44,8 +49,10 @@ pub(super) enum CollectorKind { } impl CollectorKind { - pub(super) const ALL: [CollectorKind; 7] = [ + pub(super) const ALL: [CollectorKind; 9] = [ + CollectorKind::Discovery, CollectorKind::Sensor, + CollectorKind::Metrics, CollectorKind::Logs, CollectorKind::Firmware, CollectorKind::LeakDetector, @@ -56,7 +63,11 @@ impl CollectorKind { pub(super) fn stop_message(self) -> &'static str { match self { + CollectorKind::Discovery => { + "Stopping entity discovery collector for removed BMC endpoint" + } CollectorKind::Sensor => "Stopping sensor collector for removed BMC endpoint", + CollectorKind::Metrics => "Stopping entity metrics collector for removed BMC endpoint", CollectorKind::Logs => "Stopping logs collector for removed BMC endpoint", CollectorKind::Firmware => "Stopping firmware collector for removed BMC endpoint", CollectorKind::LeakDetector => { @@ -72,31 +83,39 @@ impl CollectorKind { } pub(super) struct CollectorState { + discovery: HashMap, Collector>, sensors: HashMap, Collector>, + metrics: HashMap, Collector>, firmware: HashMap, Collector>, leak_detector: HashMap, Collector>, logs: HashMap, Collector>, nmxt: HashMap, Collector>, nvue_rest: HashMap, Collector>, nvue_gnmi: HashMap, Collector>, + inventories: HashMap, SharedInventory>, } impl CollectorState { fn new() -> Self { Self { + discovery: HashMap::new(), sensors: HashMap::new(), + metrics: HashMap::new(), firmware: HashMap::new(), leak_detector: HashMap::new(), logs: HashMap::new(), nmxt: HashMap::new(), nvue_rest: HashMap::new(), nvue_gnmi: HashMap::new(), + inventories: HashMap::new(), } } fn map(&self, kind: CollectorKind) -> &HashMap, Collector> { match kind { + CollectorKind::Discovery => &self.discovery, CollectorKind::Sensor => &self.sensors, + CollectorKind::Metrics => &self.metrics, CollectorKind::Logs => &self.logs, CollectorKind::Firmware => &self.firmware, CollectorKind::LeakDetector => &self.leak_detector, @@ -111,7 +130,9 @@ impl CollectorState { kind: CollectorKind, ) -> &mut HashMap, Collector> { match kind { + CollectorKind::Discovery => &mut self.discovery, CollectorKind::Sensor => &mut self.sensors, + CollectorKind::Metrics => &mut self.metrics, CollectorKind::Logs => &mut self.logs, CollectorKind::Firmware => &mut self.firmware, CollectorKind::LeakDetector => &mut self.leak_detector, @@ -121,6 +142,21 @@ impl CollectorState { } } + pub(super) fn inventory_for(&mut self, key: &str) -> SharedInventory { + if let Some(shared) = self.inventories.get(key) { + return shared.clone(); + } + let shared = Arc::new(ArcSwapOption::empty()); + self.inventories + .insert(Cow::Owned(key.to_string()), shared.clone()); + shared + } + + /// Drop the shared inventory handle for a removed endpoint. + pub(super) fn remove_inventory(&mut self, key: &str) { + self.inventories.remove(key); + } + pub(super) fn contains(&self, kind: CollectorKind, key: &str) -> bool { self.map(kind).contains_key(key) } @@ -142,8 +178,10 @@ impl CollectorState { &self, active_keys: &HashSet>, ) -> HashSet> { - self.sensors + self.discovery .keys() + .chain(self.sensors.keys()) + .chain(self.metrics.keys()) .chain(self.logs.keys()) .chain(self.firmware.keys()) .chain(self.leak_detector.keys()) @@ -176,7 +214,9 @@ pub struct DiscoveryLoopContext { pub(crate) discovery_endpoint_fetch_histogram: Histogram, pub(crate) limiter: Arc, pub(crate) metrics_manager: Arc, + pub(crate) discovery_config: DiscoveryConfig, pub(crate) sensors_config: Configurable, + pub(crate) metrics_config: Configurable, pub(crate) logs_config: Configurable, pub(crate) firmware_config: Configurable, pub(crate) leak_detector_config: Configurable, @@ -219,7 +259,9 @@ impl DiscoveryLoopContext { discovery_endpoint_fetch_histogram, limiter, metrics_manager, + discovery_config: config.collectors.discovery.clone(), sensors_config: config.collectors.sensors.clone(), + metrics_config: config.collectors.metrics.clone(), logs_config: config.collectors.logs.clone(), firmware_config: config.collectors.firmware.clone(), leak_detector_config: config.collectors.leak_detector.clone(), diff --git a/crates/health/src/discovery/spawn.rs b/crates/health/src/discovery/spawn.rs index e5d8aaecee..107c90a882 100644 --- a/crates/health/src/discovery/spawn.rs +++ b/crates/health/src/discovery/spawn.rs @@ -24,8 +24,9 @@ use crate::HealthError; use crate::bmc::BmcClient; use crate::collectors::{ AutoFailureBudget, BackoffConfig, BudgetDecision, Collector, CollectorStartContext, - FailureKind, FirmwareCollector, FirmwareCollectorConfig, LeakDetectorCollector, - LeakDetectorCollectorConfig, LogsCollector, LogsCollectorConfig, NmxtCollector, + EntityDiscoveryCollector, EntityDiscoveryCollectorConfig, FailureKind, FirmwareCollector, + FirmwareCollectorConfig, LeakDetectorCollector, LeakDetectorCollectorConfig, LogsCollector, + LogsCollectorConfig, MetricsCollector, MetricsCollectorConfig, NmxtCollector, NmxtCollectorConfig, NvueRestCollector, NvueRestCollectorConfig, SensorCollector, SensorCollectorConfig, SseLogCollector, SseLogCollectorConfig, StreamingCollectorStartContext, spawn_gnmi_collector, @@ -63,9 +64,54 @@ fn spawn_generic_redfish_collectors( let endpoint_arc = endpoint.clone(); let bmc = endpoint.bmc().clone(); + let sensors_enabled = matches!(ctx.sensors_config, Configurable::Enabled(_)); + let metrics_enabled = matches!(ctx.metrics_config, Configurable::Enabled(_)); + + if (sensors_enabled || metrics_enabled) + && !ctx.collectors.contains(CollectorKind::Discovery, &key) + { + let shared = ctx.collectors.inventory_for(&key); + let collector_registry = Arc::new(ctx.metrics_manager.create_collector_registry( + format!("entity_discovery_collector_{key}"), + metrics_prefix, + )?); + match Collector::start::>( + endpoint_arc.clone(), + bmc.clone(), + EntityDiscoveryCollectorConfig { + shared, + discovery_concurrency: ctx.discovery_config.discovery_concurrency, + }, + CollectorStartContext { + limiter: ctx.limiter.clone(), + iteration_interval: ctx.discovery_config.refresh_interval, + collector_registry, + metrics_manager: ctx.metrics_manager.clone(), + }, + ) { + Ok(monitor) => { + ctx.collectors + .insert(CollectorKind::Discovery, key.clone().into(), monitor); + tracing::info!( + endpoint_key = %key, + total_collectors = ctx.collectors.len(CollectorKind::Discovery), + "Started entity discovery for BMC endpoint" + ); + } + Err(error) => { + tracing::error!( + ?error, + "Could not start entity discovery collector for: {:?}", + endpoint.addr + ); + } + } + } + if let Configurable::Enabled(sensor_cfg) = &ctx.sensors_config && !ctx.collectors.contains(CollectorKind::Sensor, &key) { + let shared = ctx.collectors.inventory_for(&key); let collector_registry = Arc::new( ctx.metrics_manager .create_collector_registry(format!("sensor_collector_{key}"), metrics_prefix)?, @@ -75,7 +121,7 @@ fn spawn_generic_redfish_collectors( bmc.clone(), SensorCollectorConfig { data_sink: data_sink.clone(), - state_refresh_interval: sensor_cfg.state_refresh_interval, + shared, sensor_fetch_concurrency: sensor_cfg.sensor_fetch_concurrency, include_sensor_thresholds: sensor_cfg.include_sensor_thresholds, }, @@ -105,6 +151,48 @@ fn spawn_generic_redfish_collectors( } } + if let Configurable::Enabled(metrics_cfg) = &ctx.metrics_config + && !ctx.collectors.contains(CollectorKind::Metrics, &key) + { + let shared = ctx.collectors.inventory_for(&key); + let collector_registry = Arc::new( + ctx.metrics_manager + .create_collector_registry(format!("metrics_collector_{key}"), metrics_prefix)?, + ); + match Collector::start::>( + endpoint_arc.clone(), + bmc.clone(), + MetricsCollectorConfig { + data_sink: data_sink.clone(), + shared, + fetch_concurrency: metrics_cfg.fetch_concurrency, + }, + CollectorStartContext { + limiter: ctx.limiter.clone(), + iteration_interval: metrics_cfg.fetch_interval, + collector_registry, + metrics_manager: ctx.metrics_manager.clone(), + }, + ) { + Ok(monitor) => { + ctx.collectors + .insert(CollectorKind::Metrics, key.clone().into(), monitor); + tracing::info!( + endpoint_key = %key, + total_collectors = ctx.collectors.len(CollectorKind::Metrics), + "Started entity metrics collection for BMC endpoint" + ); + } + Err(error) => { + tracing::error!( + ?error, + "Could not start entity metrics collector for: {:?}", + endpoint.addr + ); + } + } + } + if let Configurable::Enabled(logs_cfg) = &ctx.logs_config && !ctx.collectors.contains(CollectorKind::Logs, &key) { @@ -864,6 +952,132 @@ mod tests { assert_eq!(ctx.collectors.len(CollectorKind::NvueGnmi), 0) } + #[tokio::test] + async fn machine_endpoint_with_sensors_starts_discovery_and_sensor_only() { + let mut config = Config::default(); + config.collectors.sensors = Configurable::Enabled(Default::default()); + config.collectors.metrics = Configurable::Disabled; + config.collectors.logs = Configurable::Disabled; + config.collectors.firmware = Configurable::Disabled; + config.collectors.leak_detector = Configurable::Disabled; + config.collectors.nmxt = Configurable::Disabled; + config.collectors.nvue = Configurable::Disabled; + + let mut ctx = context_with_config(config, "test_discovery_with_sensors"); + let endpoint = test_endpoint( + Ipv4Addr::new(10, 0, 0, 20), + "aa:bb:cc:00:00:20", + Some(machine_metadata()), + ); + + spawn_collectors_for_endpoint( + &mut ctx, + &endpoint, + Some(Arc::new(NoopSink)), + "test_discovery_with_sensors", + ) + .expect("spawn should succeed"); + + assert_eq!(ctx.collectors.len(CollectorKind::Discovery), 1); + assert_eq!(ctx.collectors.len(CollectorKind::Sensor), 1); + assert_eq!(ctx.collectors.len(CollectorKind::Metrics), 0); + } + + #[tokio::test] + async fn metrics_only_still_starts_discovery() { + let mut config = Config::default(); + config.collectors.sensors = Configurable::Disabled; + config.collectors.metrics = Configurable::Enabled(Default::default()); + config.collectors.logs = Configurable::Disabled; + config.collectors.firmware = Configurable::Disabled; + config.collectors.leak_detector = Configurable::Disabled; + config.collectors.nmxt = Configurable::Disabled; + config.collectors.nvue = Configurable::Disabled; + + let mut ctx = context_with_config(config, "test_discovery_with_metrics_only"); + let endpoint = test_endpoint( + Ipv4Addr::new(10, 0, 0, 21), + "aa:bb:cc:00:00:21", + Some(machine_metadata()), + ); + + spawn_collectors_for_endpoint( + &mut ctx, + &endpoint, + Some(Arc::new(NoopSink)), + "test_discovery_with_metrics_only", + ) + .expect("spawn should succeed"); + + assert_eq!(ctx.collectors.len(CollectorKind::Discovery), 1); + assert_eq!(ctx.collectors.len(CollectorKind::Sensor), 0); + assert_eq!(ctx.collectors.len(CollectorKind::Metrics), 1); + } + + #[tokio::test] + async fn no_discovery_when_both_readers_disabled() { + let mut config = Config::default(); + config.collectors.sensors = Configurable::Disabled; + config.collectors.metrics = Configurable::Disabled; + config.collectors.logs = Configurable::Disabled; + config.collectors.firmware = Configurable::Disabled; + config.collectors.leak_detector = Configurable::Disabled; + config.collectors.nmxt = Configurable::Disabled; + config.collectors.nvue = Configurable::Disabled; + + let mut ctx = context_with_config(config, "test_no_discovery"); + let endpoint = test_endpoint( + Ipv4Addr::new(10, 0, 0, 22), + "aa:bb:cc:00:00:22", + Some(machine_metadata()), + ); + + spawn_collectors_for_endpoint(&mut ctx, &endpoint, Some(Arc::new(NoopSink)), "test") + .expect("spawn should succeed"); + + assert_eq!(ctx.collectors.len(CollectorKind::Discovery), 0); + assert_eq!(ctx.collectors.len(CollectorKind::Sensor), 0); + assert_eq!(ctx.collectors.len(CollectorKind::Metrics), 0); + } + + #[tokio::test] + async fn discovery_sensor_and_metrics_spawn_is_idempotent() { + let mut config = Config::default(); + config.collectors.sensors = Configurable::Enabled(Default::default()); + config.collectors.metrics = Configurable::Enabled(Default::default()); + config.collectors.logs = Configurable::Disabled; + config.collectors.firmware = Configurable::Disabled; + config.collectors.leak_detector = Configurable::Disabled; + config.collectors.nmxt = Configurable::Disabled; + config.collectors.nvue = Configurable::Disabled; + + let mut ctx = context_with_config(config, "test_discovery_idempotent"); + let endpoint = test_endpoint( + Ipv4Addr::new(10, 0, 0, 23), + "aa:bb:cc:00:00:23", + Some(machine_metadata()), + ); + + spawn_collectors_for_endpoint( + &mut ctx, + &endpoint, + Some(Arc::new(NoopSink)), + "test_discovery_idempotent", + ) + .expect("first spawn should succeed"); + spawn_collectors_for_endpoint( + &mut ctx, + &endpoint, + Some(Arc::new(NoopSink)), + "test_discovery_idempotent", + ) + .expect("second spawn should be a no-op without duplicate registry errors"); + + assert_eq!(ctx.collectors.len(CollectorKind::Discovery), 1); + assert_eq!(ctx.collectors.len(CollectorKind::Sensor), 1); + assert_eq!(ctx.collectors.len(CollectorKind::Metrics), 1); + } + fn auto_mode_config() -> Config { let mut config = Config::default(); config.collectors.sensors = Configurable::Disabled; diff --git a/crates/health/src/lib.rs b/crates/health/src/lib.rs index 85c1ed7e95..81ae084b9c 100644 --- a/crates/health/src/lib.rs +++ b/crates/health/src/lib.rs @@ -16,7 +16,6 @@ */ use std::sync::Arc; -use std::time::Duration; use nv_redfish::bmc_http::reqwest::{ BmcError, Client as ReqwestClient, ClientParams as ReqwestClientParams, @@ -332,15 +331,7 @@ pub async fn run_service(config: Config) -> Result<(), HealthError> { .set(stats.sharded_endpoints as f64); active_endpoints_gauge.set(stats.active_monitors as f64); - tokio::time::sleep( - config - .collectors - .sensors - .as_option() - .map(|s| s.rediscover_interval) - .unwrap_or(Duration::from_secs(300)), - ) - .await; + tokio::time::sleep(config.endpoint_discovery_interval).await; } } }); diff --git a/crates/health/src/otlp/convert.rs b/crates/health/src/otlp/convert.rs index 6641762951..e83a979049 100644 --- a/crates/health/src/otlp/convert.rs +++ b/crates/health/src/otlp/convert.rs @@ -28,7 +28,7 @@ use super::metrics::{ }; use super::resource::Resource; use crate::endpoint::SwitchEndpointRole; -use crate::sink::{CollectorEvent, EventContext, SensorHealthData}; +use crate::sink::{CollectorEvent, EventContext, MetricSample}; fn severity_text_to_number(severity: &str) -> i32 { match severity.to_uppercase().as_str() { @@ -234,7 +234,7 @@ pub fn build_export_request(batch: &[(EventContext, CollectorEvent)]) -> ExportL /// group metric samples by endpoint and build an ExportMetricsServiceRequest. /// every sample maps to an OTLP `Gauge` point; Sum/Histogram is a follow-up. pub fn build_metrics_export_request( - batch: &[(EventContext, SensorHealthData)], + batch: &[(EventContext, MetricSample)], ) -> ExportMetricsServiceRequest { let observed_nanos = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -630,7 +630,7 @@ mod tests { collector_type: "nvue_gnmi", ..base_ctx }; - let sample = |name: &str| SensorHealthData { + let sample = |name: &str| MetricSample { key: "status:swp1".to_string(), name: name.to_string(), metric_type: "interface_oper_status".to_string(), @@ -660,7 +660,7 @@ mod tests { #[test] fn metric_export_name_uses_metric_type() { let ctx = test_context(); - let sample = SensorHealthData { + let sample = MetricSample { key: "asic0/oper_status".to_string(), name: "nvue_gnmi".to_string(), metric_type: "interface_oper_status".to_string(), diff --git a/crates/health/src/otlp/metrics_drain.rs b/crates/health/src/otlp/metrics_drain.rs index a3d281c4b5..bb04dac56a 100644 --- a/crates/health/src/otlp/metrics_drain.rs +++ b/crates/health/src/otlp/metrics_drain.rs @@ -24,7 +24,7 @@ use super::collector_metrics::metrics_service_client::MetricsServiceClient; use super::convert::build_metrics_export_request; use crate::collectors::{BackoffConfig, ExponentialBackoff}; use crate::sink::otlp::OtlpMetricsQueue; -use crate::sink::{EventContext, SensorHealthData}; +use crate::sink::{EventContext, MetricSample}; pub(crate) struct OtlpMetricsDrainTask { queue: Arc, @@ -48,7 +48,7 @@ impl OtlpMetricsDrainTask { } } - fn drain_batch(&self, batch: &mut Vec<(EventContext, SensorHealthData)>) { + fn drain_batch(&self, batch: &mut Vec<(EventContext, MetricSample)>) { let remaining = self.batch_size.saturating_sub(batch.len()); for _ in 0..remaining { match self.queue.pop() { @@ -127,7 +127,7 @@ impl OtlpMetricsDrainTask { async fn flush( &self, client: &mut MetricsServiceClient, - batch: &mut Vec<(EventContext, SensorHealthData)>, + batch: &mut Vec<(EventContext, MetricSample)>, ) { if batch.is_empty() { return; diff --git a/crates/health/src/processor/health_report.rs b/crates/health/src/processor/health_report.rs index ea0f9e40c9..4ccfc23089 100644 --- a/crates/health/src/processor/health_report.rs +++ b/crates/health/src/processor/health_report.rs @@ -22,8 +22,8 @@ use nv_redfish::resource::Health as BmcHealth; use super::{CollectorEvent, EventContext, EventProcessor}; use crate::sink::{ - Classification, HealthReport, HealthReportAlert, HealthReportSuccess, Probe, ReportSource, - SensorHealthContext, SensorHealthData, + Classification, HealthReport, HealthReportAlert, HealthReportSuccess, MetricSample, Probe, + ReportSource, SensorThresholdContext, }; #[derive(Debug, Clone, Copy)] @@ -83,7 +83,7 @@ impl HealthReportProcessor { } } - fn classify(health: &SensorHealthContext, reading: f64) -> SensorHealth { + fn classify(health: &SensorThresholdContext, reading: f64) -> SensorHealth { if let Some(max) = health.range_max && reading > max { @@ -135,8 +135,8 @@ impl HealthReportProcessor { } fn to_health_result( - metric: &SensorHealthData, - health: &SensorHealthContext, + metric: &MetricSample, + health: &SensorThresholdContext, ) -> SensorHealthResult { let classification = Self::classify(health, metric.value); @@ -295,14 +295,14 @@ mod tests { let _ = processor.process_event( &context, &CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "sensor-1".to_string(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), unit: "celsius".to_string(), value: 42.0, labels: vec![], - context: Some(SensorHealthContext { + context: Some(SensorThresholdContext { entity_type: "sensor".to_string(), sensor_id: "Temp1".to_string(), upper_fatal: None, diff --git a/crates/health/src/processor/leak_events.rs b/crates/health/src/processor/leak_events.rs index a7f088da7a..a34dadc775 100644 --- a/crates/health/src/processor/leak_events.rs +++ b/crates/health/src/processor/leak_events.rs @@ -215,7 +215,7 @@ mod tests { fn ignores_non_health_report_events() { let processor = LeakEventProcessor::new(1); let metric_event = CollectorEvent::Metric( - crate::sink::SensorHealthData { + crate::sink::MetricSample { key: "k".to_string(), name: "n".to_string(), metric_type: "gauge".to_string(), diff --git a/crates/health/src/processor/mod.rs b/crates/health/src/processor/mod.rs index 99cfccd526..ffb9a2efc5 100644 --- a/crates/health/src/processor/mod.rs +++ b/crates/health/src/processor/mod.rs @@ -199,7 +199,7 @@ mod tests { ); let event = CollectorEvent::Metric( - crate::sink::SensorHealthData { + crate::sink::MetricSample { key: "k".to_string(), name: "n".to_string(), metric_type: "gauge".to_string(), diff --git a/crates/health/src/sink/events.rs b/crates/health/src/sink/events.rs index abb0741822..77cc0f34cd 100644 --- a/crates/health/src/sink/events.rs +++ b/crates/health/src/sink/events.rs @@ -161,7 +161,7 @@ impl EventContext { } #[derive(Clone, Debug)] -pub struct SensorHealthContext { +pub struct SensorThresholdContext { pub entity_type: String, pub sensor_id: String, pub upper_fatal: Option, @@ -176,14 +176,14 @@ pub struct SensorHealthContext { } #[derive(Clone, Debug)] -pub struct SensorHealthData { +pub struct MetricSample { pub key: String, pub name: String, pub metric_type: String, pub unit: String, pub value: f64, pub labels: Vec, - pub context: Option, + pub context: Option, } #[derive(Clone, Debug)] @@ -232,7 +232,7 @@ impl HealthReport { #[derive(Clone, Debug)] pub enum CollectorEvent { MetricCollectionStart, - Metric(Box), + Metric(Box), MetricCollectionEnd, CollectorRemoved, Log(Box), diff --git a/crates/health/src/sink/mod.rs b/crates/health/src/sink/mod.rs index 5bf3c56bb7..da9f11186f 100644 --- a/crates/health/src/sink/mod.rs +++ b/crates/health/src/sink/mod.rs @@ -34,8 +34,8 @@ mod tracing; pub use composite::CompositeDataSink; pub use events::{ Classification, CollectorEvent, EventContext, FirmwareInfo, HealthReport, HealthReportAlert, - HealthReportSuccess, HealthReportTarget, LogRecord, Probe, ReportSource, SensorHealthContext, - SensorHealthData, + HealthReportSuccess, HealthReportTarget, LogRecord, MetricSample, Probe, ReportSource, + SensorThresholdContext, }; pub use health_report::HealthReportSink; pub use log_file::LogFileSink; @@ -65,8 +65,8 @@ mod tests { use mac_address::MacAddress; use super::{ - CollectorEvent, CompositeDataSink, DataSink, EventContext, LogRecord, PrometheusSink, - SensorHealthData, + CollectorEvent, CompositeDataSink, DataSink, EventContext, LogRecord, MetricSample, + PrometheusSink, }; use crate::endpoint::{BmcAddr, EndpointMetadata, MachineData}; use crate::metrics::MetricsManager; @@ -125,7 +125,7 @@ mod tests { }; let event = CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "key".to_string(), name: "metric".to_string(), metric_type: "gauge".to_string(), @@ -184,7 +184,7 @@ mod tests { assert!(!export_after_log.contains("test_sink_hw_sensor")); let metric_event = CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "metric_key".to_string(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), @@ -237,7 +237,7 @@ mod tests { }; let metric_event = CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "metric_key".to_string(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), @@ -294,7 +294,7 @@ mod tests { let start_event = CollectorEvent::MetricCollectionStart; sink.handle_event(&context, &start_event); let s1_event = CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "s1".to_string(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), @@ -317,7 +317,7 @@ mod tests { let start_event = CollectorEvent::MetricCollectionStart; sink.handle_event(&context, &start_event); let s2_event = CollectorEvent::Metric( - SensorHealthData { + MetricSample { key: "s2".to_string(), name: "hw_sensor".to_string(), metric_type: "temperature".to_string(), diff --git a/crates/health/src/sink/otlp.rs b/crates/health/src/sink/otlp.rs index be460e4e67..e3f6bdeb25 100644 --- a/crates/health/src/sink/otlp.rs +++ b/crates/health/src/sink/otlp.rs @@ -21,7 +21,7 @@ use prometheus::Counter; use super::dedup_queue::DedupQueue; use super::event_mapper::RedfishEventMapper; -use super::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use super::{CollectorEvent, DataSink, EventContext, MetricSample}; use crate::HealthError; use crate::config::OtlpSinkConfig; use crate::metrics::MetricsManager; @@ -29,7 +29,7 @@ use crate::otlp::drain::OtlpDrainTask; use crate::otlp::metrics_drain::OtlpMetricsDrainTask; pub(crate) type OtlpQueue = DedupQueue; -pub(crate) type OtlpMetricsQueue = DedupQueue; +pub(crate) type OtlpMetricsQueue = DedupQueue; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub(crate) struct OtlpMetricQueueKey { @@ -70,7 +70,7 @@ pub(crate) fn is_otlp_log_relevant(event: &CollectorEvent) -> bool { ) } -fn metric_queue_key(context: &EventContext, sample: &SensorHealthData) -> OtlpMetricQueueKey { +fn metric_queue_key(context: &EventContext, sample: &MetricSample) -> OtlpMetricQueueKey { OtlpMetricQueueKey { endpoint_key: context.endpoint_key.clone(), collector_type: context.collector_type, @@ -157,7 +157,7 @@ impl OtlpSink { self.queue.pop().map(|(_key, value)| value) } - pub fn pop_metric_for_bench(&self) -> Option<(EventContext, SensorHealthData)> { + pub fn pop_metric_for_bench(&self) -> Option<(EventContext, MetricSample)> { self.metrics_queue.pop().map(|(_key, value)| value) } } @@ -219,7 +219,7 @@ mod tests { use super::*; use crate::sink::event_mapper::OpenBmcEventMapper; - use crate::sink::{LogRecord, SensorHealthData}; + use crate::sink::{LogRecord, MetricSample}; fn test_context() -> EventContext { EventContext { @@ -260,7 +260,7 @@ mod tests { metric_type: &str, unit: &str, ) -> CollectorEvent { - CollectorEvent::Metric(Box::new(SensorHealthData { + CollectorEvent::Metric(Box::new(MetricSample { key: key.to_string(), name: name.to_string(), metric_type: metric_type.to_string(), diff --git a/crates/health/src/sink/prometheus.rs b/crates/health/src/sink/prometheus.rs index a2dc192518..3822b24d31 100644 --- a/crates/health/src/sink/prometheus.rs +++ b/crates/health/src/sink/prometheus.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use dashmap::DashMap; -use super::{CollectorEvent, DataSink, EventContext, SensorHealthData}; +use super::{CollectorEvent, DataSink, EventContext, MetricSample}; use crate::HealthError; use crate::metrics::{CollectorRegistry, GaugeMetrics, GaugeReading, MetricsManager}; @@ -65,7 +65,7 @@ impl PrometheusSink { ) } - fn metric_reading_key(sample: &SensorHealthData) -> String { + fn metric_reading_key(sample: &MetricSample) -> String { const KEY_SEPARATOR: &str = "::"; let separators_len = KEY_SEPARATOR.len() * 2; let mut key = String::with_capacity( From 5bdf721cb4e8de95ba918d4be0100377fbd67622 Mon Sep 17 00:00:00 2001 From: Ivan Anisimov Date: Wed, 10 Jun 2026 14:50:15 -0700 Subject: [PATCH 2/5] feat: hw-health fetch non-sensor metrics data --- crates/bmc-mock/src/redfish/processor.rs | 207 ++++++ crates/health/src/collectors/discovery.rs | 348 +++++++++ .../health/src/collectors/entity_metrics.rs | 688 ++++++++++++++++++ crates/health/src/collectors/inventory.rs | 222 ++++++ 4 files changed, 1465 insertions(+) create mode 100644 crates/bmc-mock/src/redfish/processor.rs create mode 100644 crates/health/src/collectors/discovery.rs create mode 100644 crates/health/src/collectors/entity_metrics.rs create mode 100644 crates/health/src/collectors/inventory.rs diff --git a/crates/bmc-mock/src/redfish/processor.rs b/crates/bmc-mock/src/redfish/processor.rs new file mode 100644 index 0000000000..d3c25f57e5 --- /dev/null +++ b/crates/bmc-mock/src/redfish/processor.rs @@ -0,0 +1,207 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::borrow::Cow; + +use serde_json::json; + +use crate::json::{JsonExt, JsonPatch}; +use crate::redfish; +use crate::redfish::Builder; + +pub fn system_collection(system_id: &str) -> redfish::Collection<'static> { + let odata_id = format!("/redfish/v1/Systems/{system_id}/Processors"); + redfish::Collection { + odata_id: Cow::Owned(odata_id), + odata_type: Cow::Borrowed("#ProcessorCollection.ProcessorCollection"), + name: Cow::Borrowed("Processors Collection"), + } +} + +pub fn system_resource<'a>(system_id: &str, processor_id: &'a str) -> redfish::Resource<'a> { + let odata_id = format!("/redfish/v1/Systems/{system_id}/Processors/{processor_id}"); + redfish::Resource { + odata_id: Cow::Owned(odata_id), + odata_type: Cow::Borrowed("#Processor.v1_20_0.Processor"), + id: Cow::Borrowed(processor_id), + name: Cow::Borrowed("Processor"), + } +} + +pub fn metrics_resource(system_id: &str, processor_id: &str) -> redfish::Resource<'static> { + let odata_id = + format!("/redfish/v1/Systems/{system_id}/Processors/{processor_id}/ProcessorMetrics"); + redfish::Resource { + odata_id: Cow::Owned(odata_id), + odata_type: Cow::Borrowed("#ProcessorMetrics.v1_6_1.ProcessorMetrics"), + id: Cow::Borrowed("ProcessorMetrics"), + name: Cow::Borrowed("Processor Metrics"), + } +} + +/// A mock Redfish `Processor` plus its associated `ProcessorMetrics` resource. +pub struct Processor { + pub id: Cow<'static, str>, + resource: serde_json::Value, + metrics: serde_json::Value, +} + +impl Processor { + pub fn to_json(&self) -> serde_json::Value { + self.resource.clone() + } + + pub fn metrics_json(&self) -> serde_json::Value { + self.metrics.clone() + } +} + +pub fn builder(resource: &redfish::Resource) -> ProcessorBuilder { + ProcessorBuilder { + id: Cow::Owned(resource.id.to_string()), + value: resource.json_patch(), + } +} + +pub struct ProcessorBuilder { + id: Cow<'static, str>, + value: serde_json::Value, +} + +impl Builder for ProcessorBuilder { + fn apply_patch(self, patch: serde_json::Value) -> Self { + Self { + value: self.value.patch(patch), + id: self.id, + } + } +} + +impl ProcessorBuilder { + pub fn processor_type(self, value: &str) -> Self { + self.add_str_field("ProcessorType", value) + } + + pub fn metrics(self, metrics: &redfish::Resource<'_>) -> Self { + self.apply_patch(metrics.nav_property("Metrics")) + } + + pub fn status(self, status: redfish::resource::Status) -> Self { + self.apply_patch(json!({ "Status": status.into_json() })) + } + + fn build(self, metrics: serde_json::Value) -> Processor { + Processor { + id: self.id, + resource: self.value, + metrics, + } + } +} + +pub fn gpu(system_id: &str, processor_id: &str, core_voltage_sensor_uri: &str) -> Processor { + let metrics = metrics_resource(system_id, processor_id); + let metrics_json = nvidia_gpu_metrics(&metrics, processor_id, core_voltage_sensor_uri); + builder(&system_resource(system_id, processor_id)) + .processor_type("GPU") + .status(redfish::resource::Status::Ok) + .metrics(&metrics) + .build(metrics_json) +} + +fn nvidia_gpu_metrics( + resource: &redfish::Resource<'_>, + processor_id: &str, + core_voltage_sensor_uri: &str, +) -> serde_json::Value { + resource.json_patch().patch(json!({ + "Name": format!("{processor_id} Processor Metrics"), + "BandwidthPercent": 0, + "OperatingSpeedMHz": 0, + "PowerLimitThrottleDuration": "PT0S", + "ThermalLimitThrottleDuration": "PT0S", + "CacheMetricsTotal": { + "LifeTime": { + "CorrectableECCErrorCount": 0, + "UncorrectableECCErrorCount": 0 + } + }, + "CoreVoltage": { + "DataSourceUri": core_voltage_sensor_uri, + "Reading": 0.8 + }, + "PCIeErrors": { + "CorrectableErrorCount": 2, + "FatalErrorCount": 0, + "L0ToRecoveryCount": 2, + "NAKReceivedCount": 0, + "NAKSentCount": 0, + "NonFatalErrorCount": 0, + "ReplayCount": 0, + "ReplayRolloverCount": 0, + "UnsupportedRequestCount": 0 + }, + "Oem": { + "Nvidia": { + "@odata.type": "#NvidiaProcessorMetrics.v1_4_0.NvidiaGPUProcessorMetrics", + "PCIeRXBytes": 45388, + "PCIeTXBytes": 51108, + "SMUtilizationPercent": 0, + "SRAMECCErrorThresholdExceeded": false, + "ThrottleReasons": ["NA"] + } + } + })) +} + +#[cfg(test)] +mod tests { + use super::gpu; + + #[test] + fn gpu_processor_links_to_metrics_and_chassis_sensor() { + let processor = gpu( + "HGX_Baseboard_0", + "GPU_0", + "/redfish/v1/Chassis/HGX_GPU_0/Sensors/Voltage_1", + ); + + let resource = processor.to_json(); + assert_eq!(resource["Id"], "GPU_0"); + assert_eq!(resource["ProcessorType"], "GPU"); + assert_eq!( + resource["Metrics"]["@odata.id"], + "/redfish/v1/Systems/HGX_Baseboard_0/Processors/GPU_0/ProcessorMetrics" + ); + + let metrics = processor.metrics_json(); + assert_eq!( + metrics["@odata.id"], + "/redfish/v1/Systems/HGX_Baseboard_0/Processors/GPU_0/ProcessorMetrics" + ); + // CoreVoltage is sensor-backed, so the health metrics collector skips it + // (it is emitted by the sensor collector via this DataSourceUri instead). + assert_eq!( + metrics["CoreVoltage"]["DataSourceUri"], + "/redfish/v1/Chassis/HGX_GPU_0/Sensors/Voltage_1" + ); + // Fields the metrics collector flattens and emits. + assert_eq!(metrics["PCIeErrors"]["CorrectableErrorCount"], 2); + assert_eq!(metrics["PowerLimitThrottleDuration"], "PT0S"); + assert_eq!(metrics["ThermalLimitThrottleDuration"], "PT0S"); + } +} diff --git a/crates/health/src/collectors/discovery.rs b/crates/health/src/collectors/discovery.rs new file mode 100644 index 0000000000..0d790bc6d8 --- /dev/null +++ b/crates/health/src/collectors/discovery.rs @@ -0,0 +1,348 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::{StreamExt, stream}; +use nv_redfish::ServiceRoot; +use nv_redfish::core::Bmc; + +use crate::HealthError; +use crate::collectors::inventory::{DiscoveredEntity, EntityInventory, SharedInventory}; +use crate::collectors::runtime::{IterationResult, PeriodicCollector}; +use crate::endpoint::BmcEndpoint; + +/// Configuration for the entity discovery collector +pub struct EntityDiscoveryCollectorConfig { + pub(crate) shared: SharedInventory, + pub discovery_concurrency: usize, +} + +pub struct EntityDiscoveryCollector { + endpoint: Arc, + bmc: Arc, + shared: SharedInventory, + discovery_concurrency: usize, + generation: u64, +} + +impl PeriodicCollector for EntityDiscoveryCollector { + type Config = EntityDiscoveryCollectorConfig; + + fn new_runner( + bmc: Arc, + endpoint: Arc, + config: Self::Config, + ) -> Result { + Ok(Self { + endpoint, + bmc, + shared: config.shared, + discovery_concurrency: config.discovery_concurrency.max(1), + generation: 0, + }) + } + + async fn run_iteration(&mut self) -> Result { + let fetch_failures = AtomicUsize::new(0); + let entities = self.discover_entities(&fetch_failures).await?; + let entity_count = entities.len(); + + self.generation = self.generation.wrapping_add(1); + self.shared.store(Some(Arc::new(EntityInventory { + entities, + discovered_at: std::time::Instant::now(), + generation: self.generation, + }))); + + tracing::info!( + bmc = %self.endpoint.addr.mac, + entity_count, + generation = self.generation, + "Published entity inventory snapshot" + ); + + Ok(IterationResult { + refresh_triggered: true, + entity_count: Some(entity_count), + fetch_failures: fetch_failures.load(Ordering::Relaxed), + }) + } + + fn collector_type(&self) -> &'static str { + "entity_discovery_collector" + } + + async fn stop(&mut self) { + // Clear the snapshot so readers stop emitting for a removed endpoint. + self.shared.store(None); + } +} + +impl EntityDiscoveryCollector { + fn record_failure( + &self, + result: Result, + context: &str, + fetch_failures: &AtomicUsize, + ) -> Option { + match result { + Ok(value) => Some(value), + Err(error) => { + fetch_failures.fetch_add(1, Ordering::Relaxed); + tracing::warn!(?error, context, bmc_addr = ?self.endpoint.addr, "Discovery fetch failed"); + None + } + } + } + + async fn discover_entities( + &self, + fetch_failures: &AtomicUsize, + ) -> Result>, HealthError> { + let service_root = ServiceRoot::new(self.bmc.clone()).await?; + + let mut entities = Vec::new(); + let mut sensor_ids = HashSet::new(); + + if let Some(systems) = service_root.systems().await? { + for system in systems.members().await? { + let system = Arc::new(system); + + self.discover_processors(&system, fetch_failures, &mut entities, &mut sensor_ids) + .await; + self.discover_memory(&system, fetch_failures, &mut entities, &mut sensor_ids) + .await; + self.discover_drives(&system, fetch_failures, &mut entities, &mut sensor_ids) + .await; + } + } + + if let Some(chassis_list) = service_root.chassis().await? { + for chassis in chassis_list.members().await? { + let chassis = Arc::new(chassis); + + self.discover_power_supplies( + &chassis, + fetch_failures, + &mut entities, + &mut sensor_ids, + ) + .await; + self.discover_chassis(&chassis, fetch_failures, &mut entities, &mut sensor_ids) + .await; + } + } + + Ok(entities) + } + + async fn discover_processors( + &self, + system: &Arc>, + fetch_failures: &AtomicUsize, + entities: &mut Vec>, + sensor_ids: &mut HashSet, + ) { + let processors = self + .record_failure(system.processors().await, "get processors", fetch_failures) + .flatten() + .unwrap_or_default(); + + let discovered: Vec<_> = stream::iter(processors) + .map(|processor| async move { + let processor = Arc::new(processor); + let env = processor + .environment_sensor_links() + .await + .unwrap_or_default(); + let metric = processor.metrics_sensor_links().await.unwrap_or_default(); + let sensors: Vec<_> = env.into_iter().chain(metric).collect(); + (processor, sensors) + }) + .buffer_unordered(self.discovery_concurrency) + .collect() + .await; + + for (entity, sensors) in discovered { + for sensor in &sensors { + sensor_ids.insert(sensor.odata_id().to_string()); + } + entities.push(DiscoveredEntity::Processor { + entity, + system: system.clone(), + sensors, + }); + } + } + + async fn discover_memory( + &self, + system: &Arc>, + fetch_failures: &AtomicUsize, + entities: &mut Vec>, + sensor_ids: &mut HashSet, + ) { + let memory_modules = self + .record_failure( + system.memory_modules().await, + "get memory modules", + fetch_failures, + ) + .flatten() + .unwrap_or_default(); + + let discovered: Vec<_> = stream::iter(memory_modules) + .map(|memory| async move { + let memory = Arc::new(memory); + let sensors = memory.environment_sensor_links().await.unwrap_or_default(); + (memory, sensors) + }) + .buffer_unordered(self.discovery_concurrency) + .collect() + .await; + + for (entity, sensors) in discovered { + for sensor in &sensors { + sensor_ids.insert(sensor.odata_id().to_string()); + } + entities.push(DiscoveredEntity::Memory { + entity, + system: system.clone(), + sensors, + }); + } + } + + async fn discover_drives( + &self, + system: &Arc>, + fetch_failures: &AtomicUsize, + entities: &mut Vec>, + sensor_ids: &mut HashSet, + ) { + let storage_list = self + .record_failure( + system.storage_controllers().await, + "get storage", + fetch_failures, + ) + .flatten() + .unwrap_or_default(); + + for storage in storage_list { + let storage = Arc::new(storage); + let drives = self + .record_failure(storage.drives().await, "get drives", fetch_failures) + .flatten() + .unwrap_or_default(); + + let discovered: Vec<_> = stream::iter(drives) + .map(|drive| async move { + let drive = Arc::new(drive); + let sensors = drive.environment_sensor_links().await.unwrap_or_default(); + (drive, sensors) + }) + .buffer_unordered(self.discovery_concurrency) + .collect() + .await; + + for (entity, sensors) in discovered { + for sensor in &sensors { + sensor_ids.insert(sensor.odata_id().to_string()); + } + entities.push(DiscoveredEntity::Drive { + entity, + storage: storage.clone(), + system: system.clone(), + sensors, + }); + } + } + } + + async fn discover_power_supplies( + &self, + chassis: &Arc>, + fetch_failures: &AtomicUsize, + entities: &mut Vec>, + sensor_ids: &mut HashSet, + ) { + let power_supplies = self + .record_failure( + chassis.power_supplies().await, + "get power supplies", + fetch_failures, + ) + .unwrap_or_default(); + + let discovered: Vec<_> = stream::iter(power_supplies) + .map(|ps| async move { + let ps = Arc::new(ps); + let sensors = ps.metrics_sensor_links().await.unwrap_or_default(); + (ps, sensors) + }) + .buffer_unordered(self.discovery_concurrency) + .collect() + .await; + + for (entity, sensors) in discovered { + for sensor in &sensors { + sensor_ids.insert(sensor.odata_id().to_string()); + } + entities.push(DiscoveredEntity::PowerSupply { + entity, + chassis: chassis.clone(), + sensors, + }); + } + } + + async fn discover_chassis( + &self, + chassis: &Arc>, + fetch_failures: &AtomicUsize, + entities: &mut Vec>, + sensor_ids: &mut HashSet, + ) { + let sensors = match chassis.sensor_links().await { + Ok(Some(sensors)) => sensors, + Ok(None) => Vec::new(), + Err(error) => { + fetch_failures.fetch_add(1, Ordering::Relaxed); + tracing::warn!(?error, bmc_addr = ?self.endpoint.addr, "Failed to get chassis sensors"); + Vec::new() + } + }; + + let sensors: Vec<_> = sensors + .into_iter() + .filter(|sensor| sensor_ids.insert(sensor.odata_id().to_string())) + .collect(); + + if sensors.is_empty() { + return; + } + + entities.push(DiscoveredEntity::Chassis { + entity: chassis.clone(), + sensors, + }); + } +} diff --git a/crates/health/src/collectors/entity_metrics.rs b/crates/health/src/collectors/entity_metrics.rs new file mode 100644 index 0000000000..b776a2a692 --- /dev/null +++ b/crates/health/src/collectors/entity_metrics.rs @@ -0,0 +1,688 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::borrow::Cow; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::{StreamExt, stream}; +use nv_redfish::core::Bmc; +use nv_redfish::schema::memory_metrics::MemoryMetrics; +use nv_redfish::schema::pcie_device::PcieErrors; +use nv_redfish::schema::power_supply_metrics::PowerSupplyMetrics; +use nv_redfish::schema::processor_metrics::ProcessorMetrics; + +use crate::HealthError; +use crate::collectors::inventory::{DiscoveredEntity, SharedInventory}; +use crate::collectors::runtime::{IterationResult, PeriodicCollector}; +use crate::endpoint::BmcEndpoint; +use crate::sink::{CollectorEvent, DataSink, EventContext, MetricSample}; + +struct MetricField { + metric_type: Cow<'static, str>, + unit: &'static str, + value: f64, +} + +/// Push a scalar `Option>>` field if present +macro_rules! scalar { + ($out:expr, $src:expr, $field:ident, $mt:literal, $unit:literal) => { + if let Some(Some(value)) = $src.$field { + $out.push(MetricField { + metric_type: Cow::Borrowed($mt), + unit: $unit, + value: value as f64, + }); + } + }; +} + +/// Push a field only when it is not backed by a sensor +macro_rules! excerpt { + ($out:expr, $src:expr, $field:ident, $mt:literal, $unit:literal) => { + if let Some(excerpt) = &$src.$field { + let sensor_backed = excerpt + .data_source_uri + .as_ref() + .and_then(|inner| inner.as_ref()) + .is_some(); + if !sensor_backed { + if let Some(Some(value)) = excerpt.reading { + $out.push(MetricField { + metric_type: Cow::Borrowed($mt), + unit: $unit, + value, + }); + } + } + } + }; +} + +/// Push an ISO 8601 `Edm.Duration` field (e.g. `"PT0S"`) as seconds. +macro_rules! duration_seconds { + ($out:expr, $src:expr, $field:ident, $mt:literal) => { + if let Some(Some(duration)) = &$src.$field { + $out.push(MetricField { + metric_type: Cow::Borrowed($mt), + unit: "seconds", + value: duration.as_f64_seconds(), + }); + } + }; +} + +fn pcie_error_fields(out: &mut Vec, pcie: &PcieErrors) { + scalar!( + out, + pcie, + correctable_error_count, + "pcie_correctable_errors", + "count" + ); + scalar!( + out, + pcie, + non_fatal_error_count, + "pcie_non_fatal_errors", + "count" + ); + scalar!(out, pcie, fatal_error_count, "pcie_fatal_errors", "count"); + scalar!( + out, + pcie, + l0to_recovery_count, + "pcie_l0_to_recovery", + "count" + ); + scalar!(out, pcie, replay_count, "pcie_replay", "count"); + scalar!( + out, + pcie, + replay_rollover_count, + "pcie_replay_rollover", + "count" + ); + scalar!(out, pcie, nak_sent_count, "pcie_nak_sent", "count"); + scalar!(out, pcie, nak_received_count, "pcie_nak_received", "count"); + scalar!( + out, + pcie, + unsupported_request_count, + "pcie_unsupported_request", + "count" + ); + scalar!(out, pcie, bad_tlp_count, "pcie_bad_tlp", "count"); + scalar!(out, pcie, bad_dllp_count, "pcie_bad_dllp", "count"); + scalar!( + out, + pcie, + flow_control_timeout_errors, + "pcie_flow_control_timeout", + "count" + ); +} + +fn processor_metric_fields(m: &ProcessorMetrics) -> Vec { + let mut out = Vec::new(); + scalar!(out, m, bandwidth_percent, "bandwidth", "percent"); + scalar!(out, m, average_frequency_mhz, "average_frequency", "mhz"); + scalar!(out, m, throttling_celsius, "throttling", "celsius"); + scalar!(out, m, temperature_celsius, "temperature", "celsius"); + scalar!(out, m, consumed_power_watt, "consumed_power", "watts"); + scalar!(out, m, frequency_ratio, "frequency_ratio", "ratio"); + scalar!( + out, + m, + local_memory_bandwidth_bytes, + "local_memory_bandwidth", + "bytes" + ); + scalar!( + out, + m, + remote_memory_bandwidth_bytes, + "remote_memory_bandwidth", + "bytes" + ); + scalar!(out, m, kernel_percent, "kernel_time", "percent"); + scalar!(out, m, user_percent, "user_time", "percent"); + scalar!(out, m, operating_speed_mhz, "operating_speed", "mhz"); + scalar!( + out, + m, + correctable_core_error_count, + "correctable_core_errors", + "count" + ); + scalar!( + out, + m, + uncorrectable_core_error_count, + "uncorrectable_core_errors", + "count" + ); + scalar!( + out, + m, + correctable_other_error_count, + "correctable_other_errors", + "count" + ); + scalar!( + out, + m, + uncorrectable_other_error_count, + "uncorrectable_other_errors", + "count" + ); + duration_seconds!( + out, + m, + power_limit_throttle_duration, + "power_limit_throttle" + ); + duration_seconds!( + out, + m, + thermal_limit_throttle_duration, + "thermal_limit_throttle" + ); + excerpt!(out, m, core_voltage, "core_voltage", "volts"); + if let Some(pcie) = &m.pcie_errors { + pcie_error_fields(&mut out, pcie); + } + out +} + +fn memory_metric_fields(m: &MemoryMetrics) -> Vec { + let mut out = Vec::new(); + scalar!(out, m, block_size_bytes, "block_size", "bytes"); + scalar!(out, m, bandwidth_percent, "bandwidth", "percent"); + scalar!(out, m, operating_speed_mhz, "operating_speed", "mhz"); + scalar!( + out, + m, + corrected_volatile_error_count, + "corrected_volatile_errors", + "count" + ); + scalar!( + out, + m, + corrected_persistent_error_count, + "corrected_persistent_errors", + "count" + ); + scalar!(out, m, dirty_shutdown_count, "dirty_shutdown", "count"); + scalar!( + out, + m, + capacity_utilization_percent, + "capacity_utilization", + "percent" + ); + if let Some(cp) = &m.current_period { + scalar!( + out, + cp, + correctable_ecc_error_count, + "current_correctable_ecc_errors", + "count" + ); + scalar!( + out, + cp, + uncorrectable_ecc_error_count, + "current_uncorrectable_ecc_errors", + "count" + ); + scalar!( + out, + cp, + indeterminate_correctable_error_count, + "current_indeterminate_correctable_errors", + "count" + ); + scalar!( + out, + cp, + indeterminate_uncorrectable_error_count, + "current_indeterminate_uncorrectable_errors", + "count" + ); + } + if let Some(lt) = &m.life_time { + scalar!( + out, + lt, + correctable_ecc_error_count, + "lifetime_correctable_ecc_errors", + "count" + ); + scalar!( + out, + lt, + uncorrectable_ecc_error_count, + "lifetime_uncorrectable_ecc_errors", + "count" + ); + scalar!( + out, + lt, + indeterminate_correctable_error_count, + "lifetime_indeterminate_correctable_errors", + "count" + ); + scalar!( + out, + lt, + indeterminate_uncorrectable_error_count, + "lifetime_indeterminate_uncorrectable_errors", + "count" + ); + } + out +} + +fn drive_metric_fields(m: &nv_redfish::schema::drive_metrics::DriveMetrics) -> Vec { + let mut out = Vec::new(); + scalar!( + out, + m, + correctable_io_read_error_count, + "correctable_io_read_errors", + "count" + ); + scalar!( + out, + m, + correctable_io_write_error_count, + "correctable_io_write_errors", + "count" + ); + scalar!( + out, + m, + uncorrectable_io_read_error_count, + "uncorrectable_io_read_errors", + "count" + ); + scalar!( + out, + m, + uncorrectable_io_write_error_count, + "uncorrectable_io_write_errors", + "count" + ); + scalar!(out, m, bad_block_count, "bad_block", "count"); + scalar!(out, m, power_on_hours, "power_on_hours", "hours"); + scalar!( + out, + m, + native_command_queue_depth, + "native_command_queue_depth", + "count" + ); + scalar!(out, m, read_ioki_bytes, "read_io", "kibibytes"); + scalar!(out, m, write_ioki_bytes, "write_io", "kibibytes"); + out +} + +fn power_supply_metric_fields(m: &PowerSupplyMetrics) -> Vec { + let mut out = Vec::new(); + excerpt!(out, m, input_voltage, "input_voltage", "volts"); + excerpt!(out, m, input_current_amps, "input_current", "amperes"); + excerpt!(out, m, input_power_watts, "input_power", "watts"); + excerpt!(out, m, energyk_wh, "energy", "kilowatt_hours"); + excerpt!(out, m, frequency_hz, "frequency", "hertz"); + excerpt!(out, m, output_power_watts, "output_power", "watts"); + excerpt!(out, m, temperature_celsius, "temperature", "celsius"); + excerpt!(out, m, fan_speed_percent, "fan_speed", "percent"); + out +} + +pub struct MetricsCollectorConfig { + pub data_sink: Option>, + pub(crate) shared: SharedInventory, + pub fetch_concurrency: usize, +} + +pub struct MetricsCollector { + endpoint: Arc, + event_context: EventContext, + shared: SharedInventory, + data_sink: Option>, + fetch_concurrency: usize, +} + +impl PeriodicCollector for MetricsCollector { + type Config = MetricsCollectorConfig; + + fn new_runner( + _bmc: Arc, + endpoint: Arc, + config: Self::Config, + ) -> Result { + let event_context = EventContext::from_endpoint(endpoint.as_ref(), "metrics_collector"); + Ok(Self { + endpoint, + event_context, + shared: config.shared, + data_sink: config.data_sink, + fetch_concurrency: config.fetch_concurrency.max(1), + }) + } + + async fn run_iteration(&mut self) -> Result { + let Some(inventory) = self.shared.load_full() else { + tracing::debug!( + bmc_addr = ?self.endpoint.addr, + "No entity inventory available yet; skipping metrics iteration" + ); + return Ok(IterationResult { + refresh_triggered: false, + entity_count: None, + fetch_failures: 0, + }); + }; + + tracing::debug!( + bmc_addr = ?self.endpoint.addr, + generation = inventory.generation, + inventory_age_secs = inventory.discovered_at.elapsed().as_secs(), + entity_count = inventory.entities.len(), + "Reading entity inventory snapshot for metrics iteration" + ); + + let fetch_failures = AtomicUsize::new(0); + self.emit_event(CollectorEvent::MetricCollectionStart); + + let this = &*self; + let failures = &fetch_failures; + let futures: Vec<_> = inventory + .entities + .iter() + .map(|entity| this.collect_entity(entity, failures)) + .collect(); + + let processed: usize = stream::iter(futures) + .buffer_unordered(self.fetch_concurrency) + .collect::>() + .await + .into_iter() + .sum(); + + self.emit_event(CollectorEvent::MetricCollectionEnd); + + Ok(IterationResult { + refresh_triggered: false, + entity_count: Some(processed), + fetch_failures: fetch_failures.load(Ordering::Relaxed), + }) + } + + fn collector_type(&self) -> &'static str { + "metrics_collector" + } + + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); + } +} + +impl MetricsCollector { + fn emit_event(&self, event: CollectorEvent) { + if let Some(data_sink) = &self.data_sink { + data_sink.handle_event(&self.event_context, &event); + } + } + + async fn collect_entity( + &self, + entity: &DiscoveredEntity, + fetch_failures: &AtomicUsize, + ) -> usize { + let fields = match entity { + DiscoveredEntity::Processor { entity, .. } => { + match self.fetch(entity.metrics().await, "processor metrics", fetch_failures) { + Some(Some(m)) => processor_metric_fields(&m), + _ => return 0, + } + } + DiscoveredEntity::Memory { entity, .. } => { + match self.fetch(entity.metrics().await, "memory metrics", fetch_failures) { + Some(Some(m)) => memory_metric_fields(&m), + _ => return 0, + } + } + DiscoveredEntity::Drive { entity, .. } => { + match self.fetch(entity.metrics().await, "drive metrics", fetch_failures) { + Some(Some(m)) => drive_metric_fields(&m), + _ => return 0, + } + } + DiscoveredEntity::PowerSupply { entity, .. } => { + match self.fetch( + entity.metrics().await, + "power supply metrics", + fetch_failures, + ) { + Some(Some(m)) => power_supply_metric_fields(&m), + _ => return 0, + } + } + DiscoveredEntity::Chassis { .. } => return 0, + }; + + if fields.is_empty() { + return 0; + } + + // The metric_type and unit are encoded in the Prometheus series name + // (`{prefix}_hw_metric_{metric_type}_{unit}`), so they are not repeated + // as labels here. + let mut base = entity.base_attributes(); + base.extend(entity.entity_specific_attributes()); + + let entity_key = entity.key(); + let count = fields.len(); + for field in fields { + self.emit_event(CollectorEvent::Metric( + MetricSample { + key: format!("{entity_key}/{}", field.metric_type), + name: "hw_metric".to_string(), + metric_type: field.metric_type.to_string(), + unit: field.unit.to_string(), + value: field.value, + labels: base.clone(), + context: None, + } + .into(), + )); + } + count + } + + fn fetch( + &self, + result: Result, + context: &str, + fetch_failures: &AtomicUsize, + ) -> Option { + match result { + Ok(value) => Some(value), + Err(error) => { + fetch_failures.fetch_add(1, Ordering::Relaxed); + tracing::warn!(?error, context, bmc_addr = ?self.endpoint.addr, "Failed to fetch metrics resource"); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use serde_json::json; + + use super::*; + + fn by_type(fields: &[MetricField]) -> HashMap { + fields + .iter() + .map(|f| (f.metric_type.to_string(), (f.unit, f.value))) + .collect() + } + + #[test] + fn processor_scalars_and_pcie_errors_are_flattened() { + let metrics: ProcessorMetrics = serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Systems/1/Processors/CPU0/ProcessorMetrics", + "Id": "ProcessorMetrics", + "Name": "Processor Metrics", + "BandwidthPercent": 42.5, + "OperatingSpeedMHz": 3200, + "CorrectableCoreErrorCount": 7, + "UncorrectableCoreErrorCount": 0, + "PCIeErrors": { + "CorrectableErrorCount": 3, + "FatalErrorCount": 1 + }, + "PowerLimitThrottleDuration": "PT0S", + "ThermalLimitThrottleDuration": "PT1M30S" + })) + .expect("processor metrics should deserialize"); + + let fields = by_type(&processor_metric_fields(&metrics)); + assert_eq!(fields.get("bandwidth"), Some(&("percent", 42.5))); + assert_eq!(fields.get("operating_speed"), Some(&("mhz", 3200.0))); + assert_eq!(fields.get("correctable_core_errors"), Some(&("count", 7.0))); + assert_eq!( + fields.get("uncorrectable_core_errors"), + Some(&("count", 0.0)) + ); + assert_eq!(fields.get("pcie_correctable_errors"), Some(&("count", 3.0))); + assert_eq!(fields.get("pcie_fatal_errors"), Some(&("count", 1.0))); + // ISO 8601 durations are emitted as seconds. + assert_eq!(fields.get("power_limit_throttle"), Some(&("seconds", 0.0))); + assert_eq!( + fields.get("thermal_limit_throttle"), + Some(&("seconds", 90.0)) + ); + } + + #[test] + fn sensor_backed_excerpt_is_skipped_but_inline_excerpt_is_emitted() { + // CoreVoltage carrying a DataSourceUri is already published as hw_sensor + // and must NOT be re-emitted here. + let linked: ProcessorMetrics = serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Systems/1/Processors/CPU0/ProcessorMetrics", + "Id": "ProcessorMetrics", + "Name": "Processor Metrics", + "CoreVoltage": { + "DataSourceUri": "/redfish/v1/Chassis/1/Sensors/CPU0_Voltage", + "Reading": 1.2 + } + })) + .expect("deserialize"); + assert!(!by_type(&processor_metric_fields(&linked)).contains_key("core_voltage")); + + // Without a DataSourceUri the inline reading is emitted. + let inline: ProcessorMetrics = serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Systems/1/Processors/CPU0/ProcessorMetrics", + "Id": "ProcessorMetrics", + "Name": "Processor Metrics", + "CoreVoltage": { "Reading": 1.05 } + })) + .expect("deserialize"); + assert_eq!( + by_type(&processor_metric_fields(&inline)).get("core_voltage"), + Some(&("volts", 1.05)) + ); + } + + #[test] + fn memory_nested_periods_are_flattened_with_prefixes() { + let metrics: MemoryMetrics = serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Systems/1/Memory/DIMM0/MemoryMetrics", + "Id": "MemoryMetrics", + "Name": "Memory Metrics", + "CorrectedVolatileErrorCount": 2, + "CurrentPeriod": { "CorrectableECCErrorCount": 5 }, + "LifeTime": { "UncorrectableECCErrorCount": 9 } + })) + .expect("memory metrics should deserialize"); + + let fields = by_type(&memory_metric_fields(&metrics)); + assert_eq!( + fields.get("corrected_volatile_errors"), + Some(&("count", 2.0)) + ); + assert_eq!( + fields.get("current_correctable_ecc_errors"), + Some(&("count", 5.0)) + ); + assert_eq!( + fields.get("lifetime_uncorrectable_ecc_errors"), + Some(&("count", 9.0)) + ); + } + + #[test] + fn drive_io_error_counters_are_emitted() { + let metrics: nv_redfish::schema::drive_metrics::DriveMetrics = + serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Systems/1/Storage/1/Drives/D0/Metrics", + "Id": "DriveMetrics", + "Name": "Drive Metrics", + "BadBlockCount": 4, + "CorrectableIOReadErrorCount": 11, + "PowerOnHours": 12345.0 + })) + .expect("drive metrics should deserialize"); + + let fields = by_type(&drive_metric_fields(&metrics)); + assert_eq!(fields.get("bad_block"), Some(&("count", 4.0))); + assert_eq!( + fields.get("correctable_io_read_errors"), + Some(&("count", 11.0)) + ); + assert_eq!(fields.get("power_on_hours"), Some(&("hours", 12345.0))); + } + + #[test] + fn power_supply_metrics_skip_sensor_backed_excerpts() { + let metrics: PowerSupplyMetrics = serde_json::from_value(json!({ + "@odata.id": "/redfish/v1/Chassis/1/PowerSubsystem/PowerSupplies/PSU0/Metrics", + "Id": "PowerSupplyMetrics", + "Name": "Power Supply Metrics", + "InputVoltage": { + "DataSourceUri": "/redfish/v1/Chassis/1/Sensors/PSU0_Vin", + "Reading": 230.0 + }, + "OutputPowerWatts": { "Reading": 500.0 } + })) + .expect("power supply metrics should deserialize"); + + let fields = by_type(&power_supply_metric_fields(&metrics)); + // Sensor-backed input voltage is skipped; inline output power is kept. + assert!(!fields.contains_key("input_voltage")); + assert_eq!(fields.get("output_power"), Some(&("watts", 500.0))); + } +} diff --git a/crates/health/src/collectors/inventory.rs b/crates/health/src/collectors/inventory.rs new file mode 100644 index 0000000000..3e390fcd2e --- /dev/null +++ b/crates/health/src/collectors/inventory.rs @@ -0,0 +1,222 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::borrow::Cow; +use std::sync::Arc; +use std::time::Instant; + +use arc_swap::ArcSwapOption; +use nv_redfish::Resource; +use nv_redfish::chassis::{Chassis, PowerSupply}; +use nv_redfish::computer_system::{ComputerSystem, Drive, Memory, Processor, Storage}; +use nv_redfish::core::{Bmc, ToSnakeCase}; +use nv_redfish::sensor::SensorLink; + +use crate::metrics::MetricLabel; + +pub(crate) struct DerivedMetric { + pub(crate) metric_type: &'static str, + pub(crate) unit: &'static str, + pub(crate) value: f64, +} + +pub(crate) enum DiscoveredEntity { + Processor { + entity: Arc>, + system: Arc>, + sensors: Vec>, + }, + Memory { + entity: Arc>, + system: Arc>, + sensors: Vec>, + }, + Drive { + entity: Arc>, + storage: Arc>, + system: Arc>, + sensors: Vec>, + }, + PowerSupply { + entity: Arc>, + chassis: Arc>, + sensors: Vec>, + }, + Chassis { + entity: Arc>, + sensors: Vec>, + }, +} + +impl DiscoveredEntity { + pub(crate) fn sensors(&self) -> &[SensorLink] { + match self { + DiscoveredEntity::Processor { sensors, .. } + | DiscoveredEntity::Memory { sensors, .. } + | DiscoveredEntity::Drive { sensors, .. } + | DiscoveredEntity::PowerSupply { sensors, .. } + | DiscoveredEntity::Chassis { sensors, .. } => sensors, + } + } + + pub(crate) fn entity_type(&self) -> &'static str { + match self { + DiscoveredEntity::Processor { .. } => "processor", + DiscoveredEntity::Memory { .. } => "memory", + DiscoveredEntity::Drive { .. } => "drive", + DiscoveredEntity::PowerSupply { .. } => "powersupply", + DiscoveredEntity::Chassis { .. } => "chassis", + } + } + + pub(crate) fn physical_context_fallback(&self) -> &'static str { + match self { + DiscoveredEntity::Processor { .. } => "cpu", + DiscoveredEntity::Memory { .. } => "memory", + DiscoveredEntity::Drive { .. } => "storage_device", + DiscoveredEntity::PowerSupply { .. } => "power_supply", + DiscoveredEntity::Chassis { .. } => "chassis", + } + } + + pub(crate) fn base_attributes(&self) -> Vec { + match self { + DiscoveredEntity::Processor { entity, system, .. } => vec![ + (Cow::Borrowed("processor_id"), entity.raw().base.id.clone()), + (Cow::Borrowed("system_id"), system.raw().base.id.clone()), + ], + DiscoveredEntity::Memory { entity, system, .. } => vec![ + (Cow::Borrowed("memory_id"), entity.raw().base.id.clone()), + (Cow::Borrowed("system_id"), system.raw().base.id.clone()), + ], + DiscoveredEntity::Drive { + entity, + system, + storage, + .. + } => vec![ + (Cow::Borrowed("drive_id"), entity.raw().base.id.clone()), + (Cow::Borrowed("storage_id"), storage.raw().base.id.clone()), + (Cow::Borrowed("system_id"), system.raw().base.id.clone()), + ], + DiscoveredEntity::PowerSupply { + entity, chassis, .. + } => vec![ + ( + Cow::Borrowed("powersupply_id"), + entity.raw().base.id.clone(), + ), + (Cow::Borrowed("chassis_id"), chassis.raw().base.id.clone()), + ], + DiscoveredEntity::Chassis { entity, .. } => { + vec![(Cow::Borrowed("chassis_id"), entity.raw().base.id.clone())] + } + } + } + + pub(crate) fn entity_specific_attributes(&self) -> Vec { + let mut attrs = Vec::new(); + match self { + DiscoveredEntity::Processor { entity, .. } => { + if let Some(processor_type) = entity.raw().processor_type.flatten() { + attrs.push(( + Cow::Borrowed("processor_type"), + processor_type.to_snake_case().to_string(), + )); + } + if let Some(model) = entity.raw().model.clone().flatten() { + attrs.push((Cow::Borrowed("model"), model)); + } + } + DiscoveredEntity::Memory { entity, .. } => { + if let Some(device_type) = entity.raw().memory_device_type.flatten() { + attrs.push(( + Cow::Borrowed("device_type"), + device_type.to_snake_case().to_string(), + )); + } + if let Some(model) = entity.raw().model.clone().flatten() { + attrs.push((Cow::Borrowed("model"), model)); + } + } + DiscoveredEntity::Drive { entity, .. } => { + if let Some(model) = entity.raw().model.clone().flatten() { + attrs.push((Cow::Borrowed("model"), model)); + } + } + DiscoveredEntity::PowerSupply { entity, .. } => { + if let Some(model) = entity.raw().model.clone().flatten() { + attrs.push((Cow::Borrowed("model"), model)); + } + } + DiscoveredEntity::Chassis { entity, .. } => { + if let Some(model) = entity.raw().model.clone().flatten() { + attrs.push((Cow::Borrowed("model"), model)); + } + } + } + attrs + } + + pub(crate) fn key(&self) -> String { + match self { + DiscoveredEntity::Processor { entity, .. } => entity.odata_id().to_string(), + DiscoveredEntity::Memory { entity, .. } => entity.odata_id().to_string(), + DiscoveredEntity::Drive { entity, .. } => entity.odata_id().to_string(), + DiscoveredEntity::PowerSupply { entity, .. } => entity.odata_id().to_string(), + DiscoveredEntity::Chassis { entity, .. } => entity.odata_id().to_string(), + } + } + + pub(crate) fn derived_metrics(&self) -> Vec { + match self { + DiscoveredEntity::Drive { entity, .. } => entity + .raw() + .predicted_media_life_left_percent + .flatten() + .map(|value| { + vec![DerivedMetric { + metric_type: "drive_predicted_media_life_left", + unit: "percentage", + value, + }] + }) + .unwrap_or_default(), + DiscoveredEntity::PowerSupply { entity, .. } => entity + .raw() + .power_capacity_watts + .flatten() + .map(|value| { + vec![DerivedMetric { + metric_type: "powersupply_capacity", + unit: "watts", + value, + }] + }) + .unwrap_or_default(), + _ => Vec::new(), + } + } +} + +pub(crate) struct EntityInventory { + pub(crate) entities: Vec>, + pub(crate) discovered_at: Instant, + pub(crate) generation: u64, +} + +pub(crate) type SharedInventory = Arc>>; From 5c9b72ac2a3f180e51039687096878f53358f93f Mon Sep 17 00:00:00 2001 From: Ivan Anisimov Date: Thu, 11 Jun 2026 10:06:31 -0700 Subject: [PATCH 3/5] feat: hw-health fetch non-sensor metrics data --- crates/bmc-mock/src/hw/nvidia_gb200.rs | 35 +++++++++++++--- crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs | 48 ++++------------------ 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/crates/bmc-mock/src/hw/nvidia_gb200.rs b/crates/bmc-mock/src/hw/nvidia_gb200.rs index 0a23a58eae..ede6cfb74f 100644 --- a/crates/bmc-mock/src/hw/nvidia_gb200.rs +++ b/crates/bmc-mock/src/hw/nvidia_gb200.rs @@ -77,11 +77,36 @@ impl BiancaBoard<'_> { } } - pub fn hgx_gpu_chassis( - &self, - ids: [GpuChassisIds; 2], - ) -> [redfish::chassis::SingleChassisConfig; 2] { - ids.map(|ids| { + fn gpu_base_index(&self) -> usize { + match self.index { + BoardIndex::Board0 => 0, + BoardIndex::Board1 => 2, + } + } + + pub fn gpu_chassis_ids(&self) -> [GpuChassisIds; 2] { + let base = self.gpu_base_index(); + [0, 1].map(|local| { + let n = base + local; + GpuChassisIds { + chassis_id: format!("HGX_GPU_{n}").into(), + pcie_device_id: format!("GPU_{n}").into(), + } + }) + } + + pub fn hgx_gpu_processors(&self, system_id: &str) -> [redfish::processor::Processor; 2] { + self.gpu_chassis_ids().map(|ids| { + redfish::processor::gpu( + system_id, + &ids.pcie_device_id, + &format!("/redfish/v1/Chassis/{}/Sensors/Voltage_1", ids.chassis_id), + ) + }) + } + + pub fn hgx_gpu_chassis(&self) -> [redfish::chassis::SingleChassisConfig; 2] { + self.gpu_chassis_ids().map(|ids| { let sensors = redfish::sensor::generate_chassis_sensors( &ids.chassis_id, redfish::sensor::Layout { diff --git a/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs b/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs index 4d4b6c7323..b498a509e5 100644 --- a/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs +++ b/crates/bmc-mock/src/hw/wiwynn_gb200_nvl.rs @@ -94,6 +94,8 @@ impl WiwynnGB200Nvl<'_> { .build() })).collect(); + let hgx_baseboard_id = "HGX_Baseboard_0"; + redfish::computer_system::Config { systems: vec![ redfish::computer_system::SingleSystemConfig { @@ -121,7 +123,7 @@ impl WiwynnGB200Nvl<'_> { secure_boot_available: true, }, redfish::computer_system::SingleSystemConfig { - id: "HGX_Baseboard_0".into(), + id: hgx_baseboard_id.into(), manufacturer: Some("NVIDIA".into()), model: Some("GB200 NVL".into()), chassis: vec!["HGX_Chassis_0".into()], @@ -136,14 +138,9 @@ impl WiwynnGB200Nvl<'_> { log_services: None, storage: None, processors: Some( - (0..4) - .map(|n| { - redfish::processor::gpu( - "HGX_Baseboard_0", - &format!("GPU_{n}"), - &format!("/redfish/v1/Chassis/HGX_GPU_{n}/Sensors/Voltage_1"), - ) - }) + self.compute_board + .iter() + .flat_map(|board| board.hgx_gpu_processors(hgx_baseboard_id)) .collect(), ), secure_boot_available: false, @@ -215,36 +212,9 @@ impl WiwynnGB200Nvl<'_> { .map(|(index, id)| self.compute_board[index].hgx_cpu_chassis(id.into())), ) .chain( - [ - ( - 0, - [ - hw::nvidia_gb200::GpuChassisIds { - chassis_id: "HGX_GPU_0".into(), - pcie_device_id: "GPU_0".into(), - }, - hw::nvidia_gb200::GpuChassisIds { - chassis_id: "HGX_GPU_1".into(), - pcie_device_id: "GPU_1".into(), - }, - ], - ), - ( - 1, - [ - hw::nvidia_gb200::GpuChassisIds { - chassis_id: "HGX_GPU_2".into(), - pcie_device_id: "GPU_2".into(), - }, - hw::nvidia_gb200::GpuChassisIds { - chassis_id: "HGX_GPU_3".into(), - pcie_device_id: "GPU_3".into(), - }, - ], - ), - ] - .into_iter() - .flat_map(|(index, ids)| self.compute_board[index].hgx_gpu_chassis(ids)), + self.compute_board + .iter() + .flat_map(|board| board.hgx_gpu_chassis()), ) .chain( self.io_board From b92546f982feaaa663aaf4a53d44ffe40d4a79de Mon Sep 17 00:00:00 2001 From: Ivan Anisimov Date: Thu, 11 Jun 2026 11:52:43 -0700 Subject: [PATCH 4/5] feat: hw-health fetch non-sensor metrics data --- crates/bmc-mock/src/hw/nvidia_gb200.rs | 7 ++++++- crates/bmc-mock/src/redfish/processor.rs | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/bmc-mock/src/hw/nvidia_gb200.rs b/crates/bmc-mock/src/hw/nvidia_gb200.rs index ede6cfb74f..5ab264d293 100644 --- a/crates/bmc-mock/src/hw/nvidia_gb200.rs +++ b/crates/bmc-mock/src/hw/nvidia_gb200.rs @@ -100,7 +100,12 @@ impl BiancaBoard<'_> { redfish::processor::gpu( system_id, &ids.pcie_device_id, - &format!("/redfish/v1/Chassis/{}/Sensors/Voltage_1", ids.chassis_id), + redfish::sensor::chassis_resource( + &ids.chassis_id, + redfish::processor::VOLTAGE_SENSOR_NAME, + ) + .odata_id + .as_ref(), ) }) } diff --git a/crates/bmc-mock/src/redfish/processor.rs b/crates/bmc-mock/src/redfish/processor.rs index d3c25f57e5..0191fecb57 100644 --- a/crates/bmc-mock/src/redfish/processor.rs +++ b/crates/bmc-mock/src/redfish/processor.rs @@ -23,6 +23,8 @@ use crate::json::{JsonExt, JsonPatch}; use crate::redfish; use crate::redfish::Builder; +pub(crate) const VOLTAGE_SENSOR_NAME: &str = "Voltage_0"; + pub fn system_collection(system_id: &str) -> redfish::Collection<'static> { let odata_id = format!("/redfish/v1/Systems/{system_id}/Processors"); redfish::Collection { From 6367d815ed22cb0a54e7b47424b29d147679a144 Mon Sep 17 00:00:00 2001 From: Ivan Anisimov Date: Thu, 11 Jun 2026 13:07:39 -0700 Subject: [PATCH 5/5] feat: health metrics contain machine id --- crates/bmc-mock/src/hw/nvidia_gb200.rs | 11 +++++------ crates/bmc-mock/src/redfish/processor.rs | 2 -- crates/bmc-mock/src/redfish/sensor.rs | 8 ++++++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/bmc-mock/src/hw/nvidia_gb200.rs b/crates/bmc-mock/src/hw/nvidia_gb200.rs index 5ab264d293..c05e8749ac 100644 --- a/crates/bmc-mock/src/hw/nvidia_gb200.rs +++ b/crates/bmc-mock/src/hw/nvidia_gb200.rs @@ -97,15 +97,14 @@ impl BiancaBoard<'_> { pub fn hgx_gpu_processors(&self, system_id: &str) -> [redfish::processor::Processor; 2] { self.gpu_chassis_ids().map(|ids| { + let voltage_sensor_id = + redfish::sensor::sensor_id(redfish::sensor::SensorKind::Voltage, 1); redfish::processor::gpu( system_id, &ids.pcie_device_id, - redfish::sensor::chassis_resource( - &ids.chassis_id, - redfish::processor::VOLTAGE_SENSOR_NAME, - ) - .odata_id - .as_ref(), + redfish::sensor::chassis_resource(&ids.chassis_id, &voltage_sensor_id) + .odata_id + .as_ref(), ) }) } diff --git a/crates/bmc-mock/src/redfish/processor.rs b/crates/bmc-mock/src/redfish/processor.rs index 0191fecb57..d3c25f57e5 100644 --- a/crates/bmc-mock/src/redfish/processor.rs +++ b/crates/bmc-mock/src/redfish/processor.rs @@ -23,8 +23,6 @@ use crate::json::{JsonExt, JsonPatch}; use crate::redfish; use crate::redfish::Builder; -pub(crate) const VOLTAGE_SENSOR_NAME: &str = "Voltage_0"; - pub fn system_collection(system_id: &str) -> redfish::Collection<'static> { let odata_id = format!("/redfish/v1/Systems/{system_id}/Processors"); redfish::Collection { diff --git a/crates/bmc-mock/src/redfish/sensor.rs b/crates/bmc-mock/src/redfish/sensor.rs index 93293839fd..d01ad91b70 100644 --- a/crates/bmc-mock/src/redfish/sensor.rs +++ b/crates/bmc-mock/src/redfish/sensor.rs @@ -302,7 +302,7 @@ impl SensorBuilder { } #[derive(Debug, Clone, Copy)] -enum SensorKind { +pub enum SensorKind { Temperature, Fan, Power, @@ -475,6 +475,10 @@ pub fn generate_chassis_sensors(chassis_id: &str, layout: Layout) -> Vec sensors } +pub fn sensor_id(kind: SensorKind, index: usize) -> String { + format!("{}_{}", kind.id_prefix(), index) +} + fn append_sensors( sensors: &mut Vec, chassis_id: &str, @@ -483,7 +487,7 @@ fn append_sensors( rng: &mut impl Rng, ) { for index in 1..=count { - let sensor_id = format!("{}_{}", kind.id_prefix(), index); + let sensor_id = sensor_id(kind, index); let sensor_name = format!("{} {}", kind.name_prefix(), index); let sensor = builder(&chassis_resource(chassis_id, &sensor_id)) .name(&sensor_name)