From 4e34b8807210aed16312879b067425185cb7901d Mon Sep 17 00:00:00 2001 From: Omri SirComp Date: Mon, 18 May 2026 09:05:51 +0300 Subject: [PATCH 1/3] fix(env): use mpsc for autodiscovery events --- .../src/internal/env/autodiscovery.rs | 16 +++-- lib/saluki-env/src/autodiscovery/mod.rs | 4 +- .../src/autodiscovery/providers/boxed.rs | 2 +- .../src/autodiscovery/providers/local.rs | 70 ++++++++++++++++--- 4 files changed, 75 insertions(+), 17 deletions(-) diff --git a/bin/agent-data-plane/src/internal/env/autodiscovery.rs b/bin/agent-data-plane/src/internal/env/autodiscovery.rs index 4a78732691f..db7e55977f2 100644 --- a/bin/agent-data-plane/src/internal/env/autodiscovery.rs +++ b/bin/agent-data-plane/src/internal/env/autodiscovery.rs @@ -8,8 +8,10 @@ use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, S use saluki_env::autodiscovery::AutodiscoveryEvent; use saluki_env::AutodiscoveryProvider; use saluki_error::GenericError; +use std::sync::Arc; use tokio::select; -use tokio::sync::broadcast::{self, Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{debug, warn}; @@ -22,6 +24,7 @@ use tracing::{debug, warn}; #[derive(Clone)] pub struct RemoteAgentAutodiscoveryProvider { sender: Sender, + receiver: Arc>>>, } impl RemoteAgentAutodiscoveryProvider { @@ -33,9 +36,12 @@ impl RemoteAgentAutodiscoveryProvider { /// If the remote agent client couldn't be created, an error is returned. pub async fn from_configuration(config: &GenericConfiguration) -> Result<(Self, Supervisor), GenericError> { let client = RemoteAgentClient::from_configuration(config).await?; - let (sender, _) = broadcast::channel::(16); + let (sender, receiver) = mpsc::channel::(16); - let provider = Self { sender: sender.clone() }; + let provider = Self { + sender: sender.clone(), + receiver: Arc::new(Mutex::new(Some(receiver))), + }; let mut supervisor = Supervisor::new("autodiscovery")?; supervisor.add_worker(AutodiscoveryEventBroadcaster { client, sender }); @@ -47,7 +53,7 @@ impl RemoteAgentAutodiscoveryProvider { #[async_trait] impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider { async fn subscribe(&self) -> Option> { - Some(self.sender.subscribe()) + self.receiver.lock().await.take() } } @@ -89,7 +95,7 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender< Ok(response) => { for proto_config in response.configs { let event = AutodiscoveryEvent::from(proto_config); - let _ = sender.send(event); + let _ = sender.send(event).await; } } Err(status) => { diff --git a/lib/saluki-env/src/autodiscovery/mod.rs b/lib/saluki-env/src/autodiscovery/mod.rs index 0234840f4b0..50a007a54b5 100644 --- a/lib/saluki-env/src/autodiscovery/mod.rs +++ b/lib/saluki-env/src/autodiscovery/mod.rs @@ -13,7 +13,7 @@ use datadog_protos::agent::{ use fnv::FnvHasher; use saluki_error::GenericError; use stringtheory::MetaString; -use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::Receiver; use twox_hash::XxHash64; pub mod providers; @@ -367,6 +367,8 @@ pub enum AutodiscoveryEvent { #[async_trait] pub trait AutodiscoveryProvider { /// Subscribe to autodiscovery events. + /// + /// Returns `None` when no provider is configured, or when the provider's event receiver has already been taken. async fn subscribe(&self) -> Option>; } diff --git a/lib/saluki-env/src/autodiscovery/providers/boxed.rs b/lib/saluki-env/src/autodiscovery/providers/boxed.rs index 0a919127f05..1658b5aefb9 100644 --- a/lib/saluki-env/src/autodiscovery/providers/boxed.rs +++ b/lib/saluki-env/src/autodiscovery/providers/boxed.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::Receiver; use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider}; diff --git a/lib/saluki-env/src/autodiscovery/providers/local.rs b/lib/saluki-env/src/autodiscovery/providers/local.rs index 152483cf21b..9049a8a344b 100644 --- a/lib/saluki-env/src/autodiscovery/providers/local.rs +++ b/lib/saluki-env/src/autodiscovery/providers/local.rs @@ -1,14 +1,15 @@ use std::collections::BTreeMap; use std::collections::HashSet; use std::path::{Path, PathBuf}; +use std::sync::Arc; use async_trait::async_trait; use saluki_error::GenericError; use serde::Deserialize; use stringtheory::MetaString; use tokio::fs; -use tokio::sync::broadcast::{self, Receiver, Sender}; -use tokio::sync::OnceCell; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{Mutex, OnceCell}; use tokio::time::{interval, Duration}; use tracing::{debug, info, warn}; @@ -20,6 +21,7 @@ const BG_MONITOR_INTERVAL: u64 = 30; pub struct LocalAutodiscoveryProvider { search_paths: Vec, sender: Sender, + receiver: Arc>>>, listener_init: OnceCell<()>, } @@ -41,11 +43,12 @@ impl LocalAutodiscoveryProvider { }) .collect(); - let (sender, _) = broadcast::channel::(16); + let (sender, receiver) = mpsc::channel::(16); Self { search_paths, sender, + receiver: Arc::new(Mutex::new(Some(receiver))), listener_init: OnceCell::new(), } } @@ -157,7 +160,7 @@ async fn process_yaml_file( if !known_configs.contains(&config_id) { debug!("New configuration found: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() }; - let _ = sender.send(event); + let _ = sender.send(event).await; known_configs.insert(config_id.clone()); configs.insert(config_id, config); } else { @@ -167,7 +170,7 @@ async fn process_yaml_file( configs.insert(config_id.clone(), config.clone()); debug!("Configuration updated: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config }; - let _ = sender.send(event); + let _ = sender.send(event).await; } } } @@ -238,7 +241,7 @@ async fn scan_and_emit_events( // Create an unschedule Config event let event = AutodiscoveryEvent::CheckUnscheduled { config }; - let _ = sender.send(event); + let _ = sender.send(event).await; } Ok(()) @@ -253,7 +256,7 @@ impl AutodiscoveryProvider for LocalAutodiscoveryProvider { }) .await; - Some(self.sender.subscribe()) + self.receiver.lock().await.take() } } @@ -305,6 +308,21 @@ mod tests { target_path } + // Copy a file from test_data to the temp directory with a different target name. + async fn copy_test_file_as(source_name: &str, target_name: &str, temp_dir: &Path) -> PathBuf { + let source_path = test_data_path().join(source_name); + let target_path = temp_dir.join(target_name); + + let content = fs::read_to_string(&source_path) + .await + .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path)); + + let mut file = fs::File::create(&target_path).await.unwrap(); + file.write_all(content.as_bytes()).await.unwrap(); + + target_path + } + #[tokio::test] async fn test_parse_config_file() { let test_file = test_data_path().join("test-config.yaml"); @@ -337,7 +355,7 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); - let (sender, mut receiver) = broadcast::channel::(10); + let (sender, mut receiver) = mpsc::channel::(10); scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) .await @@ -370,6 +388,38 @@ mod tests { assert!(receiver.try_recv().is_err()); } + #[tokio::test] + async fn test_scan_and_emit_events_preserves_bursts() { + let dir = tempdir().unwrap(); + let _test_file1 = copy_test_file_as("config1.yaml", "config1.yaml", dir.path()).await; + let _test_file2 = copy_test_file_as("config1.yaml", "config2.yaml", dir.path()).await; + + let (sender, mut receiver) = mpsc::channel::(1); + let search_path = dir.path().to_path_buf(); + + let scan = tokio::spawn(async move { + let mut known_configs = HashSet::new(); + let mut configs = BTreeMap::new(); + + scan_and_emit_events(&[search_path], &mut known_configs, &sender, &mut configs).await + }); + + let event1 = tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + let event2 = tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + + scan.await.unwrap().unwrap(); + + assert!(matches!(event1, AutodiscoveryEvent::CheckSchedule { .. })); + assert!(matches!(event2, AutodiscoveryEvent::CheckSchedule { .. })); + assert!(receiver.try_recv().is_err()); + } + #[tokio::test] async fn test_scan_and_emit_events_removed_config() { let dir = tempdir().unwrap(); @@ -387,7 +437,7 @@ mod tests { }, ); - let (sender, mut receiver) = broadcast::channel::(10); + let (sender, mut receiver) = mpsc::channel::(10); scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) .await @@ -408,7 +458,7 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); - let (sender, mut receiver) = broadcast::channel::(10); + let (sender, mut receiver) = mpsc::channel::(10); scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) .await From 21918bc79a1d537721e99966e6e37bcb88fbfc53 Mon Sep 17 00:00:00 2001 From: Omri SirComp Date: Wed, 20 May 2026 11:48:27 +0300 Subject: [PATCH 2/3] fix(env): support multiple autodiscovery subscribers --- .../src/internal/env/autodiscovery.rs | 39 +++-- lib/saluki-env/src/autodiscovery/mod.rs | 2 +- .../src/autodiscovery/providers/local.rs | 135 ++++++++++++++---- 3 files changed, 135 insertions(+), 41 deletions(-) diff --git a/bin/agent-data-plane/src/internal/env/autodiscovery.rs b/bin/agent-data-plane/src/internal/env/autodiscovery.rs index db7e55977f2..0d83349ff89 100644 --- a/bin/agent-data-plane/src/internal/env/autodiscovery.rs +++ b/bin/agent-data-plane/src/internal/env/autodiscovery.rs @@ -23,10 +23,11 @@ use tracing::{debug, warn}; /// autodiscovery API operates in a streaming fashion, which the provider uses to then broadcast updates to subscribers. #[derive(Clone)] pub struct RemoteAgentAutodiscoveryProvider { - sender: Sender, - receiver: Arc>>>, + subscribers: AutodiscoverySubscribers, } +type AutodiscoverySubscribers = Arc>>>; + impl RemoteAgentAutodiscoveryProvider { /// Creates a new `RemoteAgentAutodiscoveryProvider` based on the given configuration, along with a [`Supervisor`] that /// drives the collection and broadcasting of autodiscovery events. @@ -36,15 +37,14 @@ impl RemoteAgentAutodiscoveryProvider { /// If the remote agent client couldn't be created, an error is returned. pub async fn from_configuration(config: &GenericConfiguration) -> Result<(Self, Supervisor), GenericError> { let client = RemoteAgentClient::from_configuration(config).await?; - let (sender, receiver) = mpsc::channel::(16); + let subscribers = Arc::new(Mutex::new(Vec::new())); let provider = Self { - sender: sender.clone(), - receiver: Arc::new(Mutex::new(Some(receiver))), + subscribers: subscribers.clone(), }; let mut supervisor = Supervisor::new("autodiscovery")?; - supervisor.add_worker(AutodiscoveryEventBroadcaster { client, sender }); + supervisor.add_worker(AutodiscoveryEventBroadcaster { client, subscribers }); Ok((provider, supervisor)) } @@ -53,13 +53,15 @@ impl RemoteAgentAutodiscoveryProvider { #[async_trait] impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider { async fn subscribe(&self) -> Option> { - self.receiver.lock().await.take() + let (sender, receiver) = mpsc::channel::(16); + self.subscribers.lock().await.push(sender); + Some(receiver) } } struct AutodiscoveryEventBroadcaster { client: RemoteAgentClient, - sender: Sender, + subscribers: AutodiscoverySubscribers, } #[async_trait] @@ -70,19 +72,19 @@ impl Supervisable for AutodiscoveryEventBroadcaster { async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { let client = self.client.clone(); - let sender = self.sender.clone(); + let subscribers = self.subscribers.clone(); Ok(Box::pin(async move { select! { _ = process_shutdown => {}, - _ = run_ad_event_broadcaster(client, sender) => {}, + _ = run_ad_event_broadcaster(client, subscribers) => {}, } Ok(()) })) } } -async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender) { +async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, subscribers: AutodiscoverySubscribers) { debug!("Listening to autodiscovery events from remote agent."); loop { @@ -95,7 +97,7 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender< Ok(response) => { for proto_config in response.configs { let event = AutodiscoveryEvent::from(proto_config); - let _ = sender.send(event).await; + send_to_subscribers(&subscribers, event).await; } } Err(status) => { @@ -112,3 +114,16 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender< debug!("Autodiscovery event stream ended. Reconnecting..."); } } + +async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) { + let mut subscribers = subscribers.lock().await; + let mut active_subscribers = Vec::with_capacity(subscribers.len()); + + for sender in subscribers.drain(..) { + if sender.send(event.clone()).await.is_ok() { + active_subscribers.push(sender); + } + } + + *subscribers = active_subscribers; +} diff --git a/lib/saluki-env/src/autodiscovery/mod.rs b/lib/saluki-env/src/autodiscovery/mod.rs index 50a007a54b5..dcbeb286efd 100644 --- a/lib/saluki-env/src/autodiscovery/mod.rs +++ b/lib/saluki-env/src/autodiscovery/mod.rs @@ -368,7 +368,7 @@ pub enum AutodiscoveryEvent { pub trait AutodiscoveryProvider { /// Subscribe to autodiscovery events. /// - /// Returns `None` when no provider is configured, or when the provider's event receiver has already been taken. + /// Returns `None` when no provider is configured. async fn subscribe(&self) -> Option>; } diff --git a/lib/saluki-env/src/autodiscovery/providers/local.rs b/lib/saluki-env/src/autodiscovery/providers/local.rs index 9049a8a344b..64a46348581 100644 --- a/lib/saluki-env/src/autodiscovery/providers/local.rs +++ b/lib/saluki-env/src/autodiscovery/providers/local.rs @@ -17,11 +17,12 @@ use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfi const BG_MONITOR_INTERVAL: u64 = 30; +type AutodiscoverySubscribers = Arc>>>; + /// A local auto-discovery provider that uses the file system. pub struct LocalAutodiscoveryProvider { search_paths: Vec, - sender: Sender, - receiver: Arc>>>, + subscribers: AutodiscoverySubscribers, listener_init: OnceCell<()>, } @@ -43,12 +44,9 @@ impl LocalAutodiscoveryProvider { }) .collect(); - let (sender, receiver) = mpsc::channel::(16); - Self { search_paths, - sender, - receiver: Arc::new(Mutex::new(Some(receiver))), + subscribers: Arc::new(Mutex::new(Vec::new())), listener_init: OnceCell::new(), } } @@ -56,7 +54,7 @@ impl LocalAutodiscoveryProvider { /// Starts a background task that periodically scans for configuration changes async fn start_background_monitor(&self, interval_sec: u64) { let mut interval = interval(Duration::from_secs(interval_sec)); - let sender = self.sender.clone(); + let subscribers = self.subscribers.clone(); let search_paths = self.search_paths.clone(); info!( @@ -71,7 +69,9 @@ impl LocalAutodiscoveryProvider { interval.tick().await; // Scan for configurations and emit events for changes - if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await { + if let Err(e) = + scan_and_emit_events(&search_paths, &mut known_configs, &subscribers, &mut configs).await + { warn!("Error scanning for configurations: {}", e); } } @@ -151,7 +151,7 @@ async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, /// needed. async fn process_yaml_file( path: PathBuf, check_name: &str, found_configs: &mut HashSet, known_configs: &mut HashSet, - sender: &Sender, configs: &mut BTreeMap, + subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap, ) { match parse_config_file(&path, check_name).await { Ok((config_id, config)) => { @@ -160,7 +160,7 @@ async fn process_yaml_file( if !known_configs.contains(&config_id) { debug!("New configuration found: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() }; - let _ = sender.send(event).await; + send_to_subscribers(subscribers, event).await; known_configs.insert(config_id.clone()); configs.insert(config_id, config); } else { @@ -170,7 +170,7 @@ async fn process_yaml_file( configs.insert(config_id.clone(), config.clone()); debug!("Configuration updated: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config }; - let _ = sender.send(event).await; + send_to_subscribers(subscribers, event).await; } } } @@ -192,7 +192,7 @@ fn is_yaml_file(path: &Path) -> bool { /// - `/.d/*.yaml`—directory-based configs; the check name is derived /// from the directory stem (the `.d` suffix is stripped). async fn scan_and_emit_events( - paths: &[PathBuf], known_configs: &mut HashSet, sender: &Sender, + paths: &[PathBuf], known_configs: &mut HashSet, subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap, ) -> Result<(), GenericError> { let mut found_configs = HashSet::new(); @@ -205,7 +205,15 @@ async fn scan_and_emit_events( if is_yaml_file(&path) { // Flat YAML file: .yaml let check_name = path.file_stem().unwrap().to_string_lossy().into_owned(); - process_yaml_file(path, &check_name, &mut found_configs, known_configs, sender, configs).await; + process_yaml_file( + path, + &check_name, + &mut found_configs, + known_configs, + subscribers, + configs, + ) + .await; } else if path.is_dir() { // Directory: only process if it matches the `.d` convention let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); @@ -217,8 +225,15 @@ async fn scan_and_emit_events( while let Ok(Some(sub_entry)) = sub_entries.next_entry().await { let sub_path = sub_entry.path(); if is_yaml_file(&sub_path) { - process_yaml_file(sub_path, check_name, &mut found_configs, known_configs, sender, configs) - .await; + process_yaml_file( + sub_path, + check_name, + &mut found_configs, + known_configs, + subscribers, + configs, + ) + .await; } } } @@ -241,12 +256,25 @@ async fn scan_and_emit_events( // Create an unschedule Config event let event = AutodiscoveryEvent::CheckUnscheduled { config }; - let _ = sender.send(event).await; + send_to_subscribers(subscribers, event).await; } Ok(()) } +async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) { + let mut subscribers = subscribers.lock().await; + let mut active_subscribers = Vec::with_capacity(subscribers.len()); + + for sender in subscribers.drain(..) { + if sender.send(event.clone()).await.is_ok() { + active_subscribers.push(sender); + } + } + + *subscribers = active_subscribers; +} + #[async_trait] impl AutodiscoveryProvider for LocalAutodiscoveryProvider { async fn subscribe(&self) -> Option> { @@ -256,7 +284,9 @@ impl AutodiscoveryProvider for LocalAutodiscoveryProvider { }) .await; - self.receiver.lock().await.take() + let (sender, receiver) = mpsc::channel::(16); + self.subscribers.lock().await.push(sender); + Some(receiver) } } @@ -356,10 +386,16 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); let (sender, mut receiver) = mpsc::channel::(10); + let subscribers = Arc::new(Mutex::new(vec![sender])); - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); assert_eq!(known_configs.len(), 1); @@ -395,13 +431,14 @@ mod tests { let _test_file2 = copy_test_file_as("config1.yaml", "config2.yaml", dir.path()).await; let (sender, mut receiver) = mpsc::channel::(1); + let subscribers = Arc::new(Mutex::new(vec![sender])); let search_path = dir.path().to_path_buf(); let scan = tokio::spawn(async move { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); - scan_and_emit_events(&[search_path], &mut known_configs, &sender, &mut configs).await + scan_and_emit_events(&[search_path], &mut known_configs, &subscribers, &mut configs).await }); let event1 = tokio::time::timeout(Duration::from_secs(1), receiver.recv()) @@ -420,6 +457,36 @@ mod tests { assert!(receiver.try_recv().is_err()); } + #[tokio::test] + async fn test_send_to_subscribers_fans_out_and_prunes_closed_receivers() { + let (sender1, mut receiver1) = mpsc::channel::(10); + let (sender2, mut receiver2) = mpsc::channel::(10); + let (closed_sender, closed_receiver) = mpsc::channel::(10); + drop(closed_receiver); + + let subscribers = Arc::new(Mutex::new(vec![sender1, sender2, closed_sender])); + let event = AutodiscoveryEvent::CheckSchedule { + config: CheckConfig { + name: MetaString::from("test-check"), + init_config: Data::default(), + instances: Vec::new(), + source: MetaString::from_static("local"), + }, + }; + + send_to_subscribers(&subscribers, event).await; + + assert_eq!(subscribers.lock().await.len(), 2); + assert!(matches!( + receiver1.try_recv().unwrap(), + AutodiscoveryEvent::CheckSchedule { .. } + )); + assert!(matches!( + receiver2.try_recv().unwrap(), + AutodiscoveryEvent::CheckSchedule { .. } + )); + } + #[tokio::test] async fn test_scan_and_emit_events_removed_config() { let dir = tempdir().unwrap(); @@ -438,10 +505,16 @@ mod tests { ); let (sender, mut receiver) = mpsc::channel::(10); + let subscribers = Arc::new(Mutex::new(vec![sender])); - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); assert_eq!(known_configs.len(), 0); @@ -459,10 +532,16 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); let (sender, mut receiver) = mpsc::channel::(10); - - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + let subscribers = Arc::new(Mutex::new(vec![sender])); + + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); // One config file inside test-check.d/ assert_eq!(known_configs.len(), 1); From f0ba958b6d85d5b7d977a94067335917affef943 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 29 May 2026 14:11:36 -0700 Subject: [PATCH 3/3] chore: fix formatting Co-Authored-By: Claude Sonnet 4.6 --- bin/agent-data-plane/src/internal/env/autodiscovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/agent-data-plane/src/internal/env/autodiscovery.rs b/bin/agent-data-plane/src/internal/env/autodiscovery.rs index 0d83349ff89..e3bb924ee2c 100644 --- a/bin/agent-data-plane/src/internal/env/autodiscovery.rs +++ b/bin/agent-data-plane/src/internal/env/autodiscovery.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -8,7 +9,6 @@ use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, S use saluki_env::autodiscovery::AutodiscoveryEvent; use saluki_env::AutodiscoveryProvider; use saluki_error::GenericError; -use std::sync::Arc; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex;