diff --git a/bin/agent-data-plane/src/internal/env/autodiscovery.rs b/bin/agent-data-plane/src/internal/env/autodiscovery.rs index 4a78732691f..e3bb924ee2c 100644 --- a/bin/agent-data-plane/src/internal/env/autodiscovery.rs +++ b/bin/agent-data-plane/src/internal/env/autodiscovery.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -9,7 +10,8 @@ use saluki_env::autodiscovery::AutodiscoveryEvent; use saluki_env::AutodiscoveryProvider; use saluki_error::GenericError; use tokio::select; -use tokio::sync::broadcast::{self, Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{debug, warn}; @@ -21,9 +23,11 @@ use tracing::{debug, warn}; /// autodiscovery API operates in a streaming fashion, which the provider uses to then broadcast updates to subscribers. #[derive(Clone)] pub struct RemoteAgentAutodiscoveryProvider { - sender: Sender, + subscribers: AutodiscoverySubscribers, } +type AutodiscoverySubscribers = Arc>>>; + impl RemoteAgentAutodiscoveryProvider { /// Creates a new `RemoteAgentAutodiscoveryProvider` based on the given configuration, along with a [`Supervisor`] that /// drives the collection and broadcasting of autodiscovery events. @@ -33,12 +37,14 @@ impl RemoteAgentAutodiscoveryProvider { /// If the remote agent client couldn't be created, an error is returned. pub async fn from_configuration(config: &GenericConfiguration) -> Result<(Self, Supervisor), GenericError> { let client = RemoteAgentClient::from_configuration(config).await?; - let (sender, _) = broadcast::channel::(16); + let subscribers = Arc::new(Mutex::new(Vec::new())); - let provider = Self { sender: sender.clone() }; + let provider = Self { + subscribers: subscribers.clone(), + }; let mut supervisor = Supervisor::new("autodiscovery")?; - supervisor.add_worker(AutodiscoveryEventBroadcaster { client, sender }); + supervisor.add_worker(AutodiscoveryEventBroadcaster { client, subscribers }); Ok((provider, supervisor)) } @@ -47,13 +53,15 @@ impl RemoteAgentAutodiscoveryProvider { #[async_trait] impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider { async fn subscribe(&self) -> Option> { - Some(self.sender.subscribe()) + let (sender, receiver) = mpsc::channel::(16); + self.subscribers.lock().await.push(sender); + Some(receiver) } } struct AutodiscoveryEventBroadcaster { client: RemoteAgentClient, - sender: Sender, + subscribers: AutodiscoverySubscribers, } #[async_trait] @@ -64,19 +72,19 @@ impl Supervisable for AutodiscoveryEventBroadcaster { async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { let client = self.client.clone(); - let sender = self.sender.clone(); + let subscribers = self.subscribers.clone(); Ok(Box::pin(async move { select! { _ = process_shutdown => {}, - _ = run_ad_event_broadcaster(client, sender) => {}, + _ = run_ad_event_broadcaster(client, subscribers) => {}, } Ok(()) })) } } -async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender) { +async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, subscribers: AutodiscoverySubscribers) { debug!("Listening to autodiscovery events from remote agent."); loop { @@ -89,7 +97,7 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender< Ok(response) => { for proto_config in response.configs { let event = AutodiscoveryEvent::from(proto_config); - let _ = sender.send(event); + send_to_subscribers(&subscribers, event).await; } } Err(status) => { @@ -106,3 +114,16 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender< debug!("Autodiscovery event stream ended. Reconnecting..."); } } + +async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) { + let mut subscribers = subscribers.lock().await; + let mut active_subscribers = Vec::with_capacity(subscribers.len()); + + for sender in subscribers.drain(..) { + if sender.send(event.clone()).await.is_ok() { + active_subscribers.push(sender); + } + } + + *subscribers = active_subscribers; +} diff --git a/lib/saluki-env/src/autodiscovery/mod.rs b/lib/saluki-env/src/autodiscovery/mod.rs index 0234840f4b0..dcbeb286efd 100644 --- a/lib/saluki-env/src/autodiscovery/mod.rs +++ b/lib/saluki-env/src/autodiscovery/mod.rs @@ -13,7 +13,7 @@ use datadog_protos::agent::{ use fnv::FnvHasher; use saluki_error::GenericError; use stringtheory::MetaString; -use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::Receiver; use twox_hash::XxHash64; pub mod providers; @@ -367,6 +367,8 @@ pub enum AutodiscoveryEvent { #[async_trait] pub trait AutodiscoveryProvider { /// Subscribe to autodiscovery events. + /// + /// Returns `None` when no provider is configured. async fn subscribe(&self) -> Option>; } diff --git a/lib/saluki-env/src/autodiscovery/providers/boxed.rs b/lib/saluki-env/src/autodiscovery/providers/boxed.rs index 0a919127f05..1658b5aefb9 100644 --- a/lib/saluki-env/src/autodiscovery/providers/boxed.rs +++ b/lib/saluki-env/src/autodiscovery/providers/boxed.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::Receiver; use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider}; diff --git a/lib/saluki-env/src/autodiscovery/providers/local.rs b/lib/saluki-env/src/autodiscovery/providers/local.rs index 152483cf21b..64a46348581 100644 --- a/lib/saluki-env/src/autodiscovery/providers/local.rs +++ b/lib/saluki-env/src/autodiscovery/providers/local.rs @@ -1,14 +1,15 @@ use std::collections::BTreeMap; use std::collections::HashSet; use std::path::{Path, PathBuf}; +use std::sync::Arc; use async_trait::async_trait; use saluki_error::GenericError; use serde::Deserialize; use stringtheory::MetaString; use tokio::fs; -use tokio::sync::broadcast::{self, Receiver, Sender}; -use tokio::sync::OnceCell; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{Mutex, OnceCell}; use tokio::time::{interval, Duration}; use tracing::{debug, info, warn}; @@ -16,10 +17,12 @@ use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfi const BG_MONITOR_INTERVAL: u64 = 30; +type AutodiscoverySubscribers = Arc>>>; + /// A local auto-discovery provider that uses the file system. pub struct LocalAutodiscoveryProvider { search_paths: Vec, - sender: Sender, + subscribers: AutodiscoverySubscribers, listener_init: OnceCell<()>, } @@ -41,11 +44,9 @@ impl LocalAutodiscoveryProvider { }) .collect(); - let (sender, _) = broadcast::channel::(16); - Self { search_paths, - sender, + subscribers: Arc::new(Mutex::new(Vec::new())), listener_init: OnceCell::new(), } } @@ -53,7 +54,7 @@ impl LocalAutodiscoveryProvider { /// Starts a background task that periodically scans for configuration changes async fn start_background_monitor(&self, interval_sec: u64) { let mut interval = interval(Duration::from_secs(interval_sec)); - let sender = self.sender.clone(); + let subscribers = self.subscribers.clone(); let search_paths = self.search_paths.clone(); info!( @@ -68,7 +69,9 @@ impl LocalAutodiscoveryProvider { interval.tick().await; // Scan for configurations and emit events for changes - if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await { + if let Err(e) = + scan_and_emit_events(&search_paths, &mut known_configs, &subscribers, &mut configs).await + { warn!("Error scanning for configurations: {}", e); } } @@ -148,7 +151,7 @@ async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, /// needed. async fn process_yaml_file( path: PathBuf, check_name: &str, found_configs: &mut HashSet, known_configs: &mut HashSet, - sender: &Sender, configs: &mut BTreeMap, + subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap, ) { match parse_config_file(&path, check_name).await { Ok((config_id, config)) => { @@ -157,7 +160,7 @@ async fn process_yaml_file( if !known_configs.contains(&config_id) { debug!("New configuration found: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() }; - let _ = sender.send(event); + send_to_subscribers(subscribers, event).await; known_configs.insert(config_id.clone()); configs.insert(config_id, config); } else { @@ -167,7 +170,7 @@ async fn process_yaml_file( configs.insert(config_id.clone(), config.clone()); debug!("Configuration updated: {}", config_id); let event = AutodiscoveryEvent::CheckSchedule { config }; - let _ = sender.send(event); + send_to_subscribers(subscribers, event).await; } } } @@ -189,7 +192,7 @@ fn is_yaml_file(path: &Path) -> bool { /// - `/.d/*.yaml`—directory-based configs; the check name is derived /// from the directory stem (the `.d` suffix is stripped). async fn scan_and_emit_events( - paths: &[PathBuf], known_configs: &mut HashSet, sender: &Sender, + paths: &[PathBuf], known_configs: &mut HashSet, subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap, ) -> Result<(), GenericError> { let mut found_configs = HashSet::new(); @@ -202,7 +205,15 @@ async fn scan_and_emit_events( if is_yaml_file(&path) { // Flat YAML file: .yaml let check_name = path.file_stem().unwrap().to_string_lossy().into_owned(); - process_yaml_file(path, &check_name, &mut found_configs, known_configs, sender, configs).await; + process_yaml_file( + path, + &check_name, + &mut found_configs, + known_configs, + subscribers, + configs, + ) + .await; } else if path.is_dir() { // Directory: only process if it matches the `.d` convention let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); @@ -214,8 +225,15 @@ async fn scan_and_emit_events( while let Ok(Some(sub_entry)) = sub_entries.next_entry().await { let sub_path = sub_entry.path(); if is_yaml_file(&sub_path) { - process_yaml_file(sub_path, check_name, &mut found_configs, known_configs, sender, configs) - .await; + process_yaml_file( + sub_path, + check_name, + &mut found_configs, + known_configs, + subscribers, + configs, + ) + .await; } } } @@ -238,12 +256,25 @@ async fn scan_and_emit_events( // Create an unschedule Config event let event = AutodiscoveryEvent::CheckUnscheduled { config }; - let _ = sender.send(event); + send_to_subscribers(subscribers, event).await; } Ok(()) } +async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) { + let mut subscribers = subscribers.lock().await; + let mut active_subscribers = Vec::with_capacity(subscribers.len()); + + for sender in subscribers.drain(..) { + if sender.send(event.clone()).await.is_ok() { + active_subscribers.push(sender); + } + } + + *subscribers = active_subscribers; +} + #[async_trait] impl AutodiscoveryProvider for LocalAutodiscoveryProvider { async fn subscribe(&self) -> Option> { @@ -253,7 +284,9 @@ impl AutodiscoveryProvider for LocalAutodiscoveryProvider { }) .await; - Some(self.sender.subscribe()) + let (sender, receiver) = mpsc::channel::(16); + self.subscribers.lock().await.push(sender); + Some(receiver) } } @@ -305,6 +338,21 @@ mod tests { target_path } + // Copy a file from test_data to the temp directory with a different target name. + async fn copy_test_file_as(source_name: &str, target_name: &str, temp_dir: &Path) -> PathBuf { + let source_path = test_data_path().join(source_name); + let target_path = temp_dir.join(target_name); + + let content = fs::read_to_string(&source_path) + .await + .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path)); + + let mut file = fs::File::create(&target_path).await.unwrap(); + file.write_all(content.as_bytes()).await.unwrap(); + + target_path + } + #[tokio::test] async fn test_parse_config_file() { let test_file = test_data_path().join("test-config.yaml"); @@ -337,11 +385,17 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); - let (sender, mut receiver) = broadcast::channel::(10); - - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + let (sender, mut receiver) = mpsc::channel::(10); + let subscribers = Arc::new(Mutex::new(vec![sender])); + + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); assert_eq!(known_configs.len(), 1); @@ -370,6 +424,69 @@ mod tests { assert!(receiver.try_recv().is_err()); } + #[tokio::test] + async fn test_scan_and_emit_events_preserves_bursts() { + let dir = tempdir().unwrap(); + let _test_file1 = copy_test_file_as("config1.yaml", "config1.yaml", dir.path()).await; + let _test_file2 = copy_test_file_as("config1.yaml", "config2.yaml", dir.path()).await; + + let (sender, mut receiver) = mpsc::channel::(1); + let subscribers = Arc::new(Mutex::new(vec![sender])); + let search_path = dir.path().to_path_buf(); + + let scan = tokio::spawn(async move { + let mut known_configs = HashSet::new(); + let mut configs = BTreeMap::new(); + + scan_and_emit_events(&[search_path], &mut known_configs, &subscribers, &mut configs).await + }); + + let event1 = tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + let event2 = tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + + scan.await.unwrap().unwrap(); + + assert!(matches!(event1, AutodiscoveryEvent::CheckSchedule { .. })); + assert!(matches!(event2, AutodiscoveryEvent::CheckSchedule { .. })); + assert!(receiver.try_recv().is_err()); + } + + #[tokio::test] + async fn test_send_to_subscribers_fans_out_and_prunes_closed_receivers() { + let (sender1, mut receiver1) = mpsc::channel::(10); + let (sender2, mut receiver2) = mpsc::channel::(10); + let (closed_sender, closed_receiver) = mpsc::channel::(10); + drop(closed_receiver); + + let subscribers = Arc::new(Mutex::new(vec![sender1, sender2, closed_sender])); + let event = AutodiscoveryEvent::CheckSchedule { + config: CheckConfig { + name: MetaString::from("test-check"), + init_config: Data::default(), + instances: Vec::new(), + source: MetaString::from_static("local"), + }, + }; + + send_to_subscribers(&subscribers, event).await; + + assert_eq!(subscribers.lock().await.len(), 2); + assert!(matches!( + receiver1.try_recv().unwrap(), + AutodiscoveryEvent::CheckSchedule { .. } + )); + assert!(matches!( + receiver2.try_recv().unwrap(), + AutodiscoveryEvent::CheckSchedule { .. } + )); + } + #[tokio::test] async fn test_scan_and_emit_events_removed_config() { let dir = tempdir().unwrap(); @@ -387,11 +504,17 @@ mod tests { }, ); - let (sender, mut receiver) = broadcast::channel::(10); + let (sender, mut receiver) = mpsc::channel::(10); + let subscribers = Arc::new(Mutex::new(vec![sender])); - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); assert_eq!(known_configs.len(), 0); @@ -408,11 +531,17 @@ mod tests { let mut known_configs = HashSet::new(); let mut configs = BTreeMap::new(); - let (sender, mut receiver) = broadcast::channel::(10); - - scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs) - .await - .unwrap(); + let (sender, mut receiver) = mpsc::channel::(10); + let subscribers = Arc::new(Mutex::new(vec![sender])); + + scan_and_emit_events( + &[dir.path().to_path_buf()], + &mut known_configs, + &subscribers, + &mut configs, + ) + .await + .unwrap(); // One config file inside test-check.d/ assert_eq!(known_configs.len(), 1);