Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions bin/agent-data-plane/src/internal/env/autodiscovery.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
Expand All @@ -9,7 +10,8 @@ use saluki_env::autodiscovery::AutodiscoveryEvent;
use saluki_env::AutodiscoveryProvider;
use saluki_error::GenericError;
use tokio::select;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::{debug, warn};

Expand All @@ -21,9 +23,11 @@ use tracing::{debug, warn};
/// autodiscovery API operates in a streaming fashion, which the provider uses to then broadcast updates to subscribers.
#[derive(Clone)]
pub struct RemoteAgentAutodiscoveryProvider {
sender: Sender<AutodiscoveryEvent>,
subscribers: AutodiscoverySubscribers,
}

type AutodiscoverySubscribers = Arc<Mutex<Vec<Sender<AutodiscoveryEvent>>>>;

impl RemoteAgentAutodiscoveryProvider {
/// Creates a new `RemoteAgentAutodiscoveryProvider` based on the given configuration, along with a [`Supervisor`] that
/// drives the collection and broadcasting of autodiscovery events.
Expand All @@ -33,12 +37,14 @@ impl RemoteAgentAutodiscoveryProvider {
/// If the remote agent client couldn't be created, an error is returned.
pub async fn from_configuration(config: &GenericConfiguration) -> Result<(Self, Supervisor), GenericError> {
let client = RemoteAgentClient::from_configuration(config).await?;
let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(16);
let subscribers = Arc::new(Mutex::new(Vec::new()));

let provider = Self { sender: sender.clone() };
let provider = Self {
subscribers: subscribers.clone(),
};

let mut supervisor = Supervisor::new("autodiscovery")?;
supervisor.add_worker(AutodiscoveryEventBroadcaster { client, sender });
supervisor.add_worker(AutodiscoveryEventBroadcaster { client, subscribers });

Ok((provider, supervisor))
}
Expand All @@ -47,13 +53,15 @@ impl RemoteAgentAutodiscoveryProvider {
#[async_trait]
impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider {
async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
Some(self.sender.subscribe())
let (sender, receiver) = mpsc::channel::<AutodiscoveryEvent>(16);
self.subscribers.lock().await.push(sender);
Some(receiver)
}
}

struct AutodiscoveryEventBroadcaster {
client: RemoteAgentClient,
sender: Sender<AutodiscoveryEvent>,
subscribers: AutodiscoverySubscribers,
}

#[async_trait]
Expand All @@ -64,19 +72,19 @@ impl Supervisable for AutodiscoveryEventBroadcaster {

async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
let client = self.client.clone();
let sender = self.sender.clone();
let subscribers = self.subscribers.clone();

Ok(Box::pin(async move {
select! {
_ = process_shutdown => {},
_ = run_ad_event_broadcaster(client, sender) => {},
_ = run_ad_event_broadcaster(client, subscribers) => {},
}
Ok(())
}))
}
}

async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender<AutodiscoveryEvent>) {
async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, subscribers: AutodiscoverySubscribers) {
debug!("Listening to autodiscovery events from remote agent.");

loop {
Expand All @@ -89,7 +97,7 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender<
Ok(response) => {
for proto_config in response.configs {
let event = AutodiscoveryEvent::from(proto_config);
let _ = sender.send(event);
send_to_subscribers(&subscribers, event).await;
}
}
Err(status) => {
Expand All @@ -106,3 +114,16 @@ async fn run_ad_event_broadcaster(mut client: RemoteAgentClient, sender: Sender<
debug!("Autodiscovery event stream ended. Reconnecting...");
}
}

async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) {
let mut subscribers = subscribers.lock().await;
let mut active_subscribers = Vec::with_capacity(subscribers.len());

for sender in subscribers.drain(..) {
if sender.send(event.clone()).await.is_ok() {
active_subscribers.push(sender);
}
}

*subscribers = active_subscribers;
}
4 changes: 3 additions & 1 deletion lib/saluki-env/src/autodiscovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datadog_protos::agent::{
use fnv::FnvHasher;
use saluki_error::GenericError;
use stringtheory::MetaString;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Receiver;
use twox_hash::XxHash64;

pub mod providers;
Expand Down Expand Up @@ -367,6 +367,8 @@ pub enum AutodiscoveryEvent {
#[async_trait]
pub trait AutodiscoveryProvider {
/// Subscribe to autodiscovery events.
///
/// Returns `None` when no provider is configured.
async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>>;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-env/src/autodiscovery/providers/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Receiver;

use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};

Expand Down
Loading
Loading