diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index a24736761..a5b54bdff 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -1,21 +1,27 @@ //! Common traits for event senders and receivers -use core::{future::ready, marker::PhantomData}; +use core::marker::PhantomData; use crate::error; use embassy_sync::{ blocking_mutex::raw::RawMutex, channel::{DynamicReceiver, DynamicSender, Receiver as ChannelReceiver, Sender as ChannelSender}, - pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, + pubsub::{DynImmediatePublisher, DynPublisher, DynSubscriber, WaitResult}, }; /// Common event sender trait -pub trait Sender { +pub trait NonBlockingSender { /// Attempt to send an event /// - /// Return none if the event cannot currently be sent + /// Return none if sending the event would block. fn try_send(&mut self, event: E) -> Option<()>; +} + +/// A sender that can block +pub trait Sender: NonBlockingSender { /// Send an event + /// + /// This blocks if the event cannot be sent immediately. fn send(&mut self, event: E) -> impl Future; } @@ -29,11 +35,22 @@ pub trait Receiver { fn wait_next(&mut self) -> impl Future; } -impl Sender for DynamicSender<'_, E> { +/// Enum for receivers that can receive immediate events +#[derive(Clone)] +pub enum ImmediateEvent { + /// Event + Event(E), + /// Lagged events + Lagged(u64), +} + +impl NonBlockingSender for DynamicSender<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { DynamicSender::try_send(self, event).ok() } +} +impl Sender for DynamicSender<'_, E> { fn send(&mut self, event: E) -> impl Future { DynamicSender::send(self, event) } @@ -49,14 +66,22 @@ impl Receiver for DynamicReceiver<'_, E> { } } -impl Sender for DynImmediatePublisher<'_, E> { +impl NonBlockingSender for DynImmediatePublisher<'_, E> { + fn try_send(&mut self, event: E) -> Option<()> { + self.publish_immediate(event); + Some(()) + } +} + +impl NonBlockingSender for DynPublisher<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { self.try_publish(event).ok() } +} +impl Sender for DynPublisher<'_, E> { fn send(&mut self, event: E) -> impl Future { - self.publish_immediate(event); - ready(()) + self.publish(event) } } @@ -85,11 +110,30 @@ impl Receiver for DynSubscriber<'_, E> { } } -impl Sender for ChannelSender<'_, M, E, N> { +impl Receiver> for DynSubscriber<'_, E> { + fn try_next(&mut self) -> Option> { + match self.try_next_message() { + Some(WaitResult::Message(e)) => Some(ImmediateEvent::Event(e)), + Some(WaitResult::Lagged(e)) => Some(ImmediateEvent::Lagged(e)), + _ => None, + } + } + + async fn wait_next(&mut self) -> ImmediateEvent { + match self.next_message().await { + WaitResult::Message(e) => ImmediateEvent::Event(e), + WaitResult::Lagged(e) => ImmediateEvent::Lagged(e), + } + } +} + +impl NonBlockingSender for ChannelSender<'_, M, E, N> { fn try_send(&mut self, event: E) -> Option<()> { ChannelSender::try_send(self, event).ok() } +} +impl Sender for ChannelSender<'_, M, E, N> { fn send(&mut self, event: E) -> impl Future { ChannelSender::send(self, event) } @@ -108,22 +152,24 @@ impl Receiver for ChannelReceiver<'_, M, E, N /// A sender that discards all events sent to it. pub struct NoopSender; -impl Sender for NoopSender { +impl NonBlockingSender for NoopSender { fn try_send(&mut self, _event: E) -> Option<()> { Some(()) } +} +impl Sender for NoopSender { async fn send(&mut self, _event: E) {} } /// Applies a function on events before passing them to the wrapped sender -pub struct MapSender, F: FnMut(I) -> O> { +pub struct MapSender, F: FnMut(I) -> O> { sender: S, map_fn: F, _phantom: PhantomData<(I, O)>, } -impl, F: FnMut(I) -> O> MapSender { +impl, F: FnMut(I) -> O> MapSender { /// Create a new self pub fn new(sender: S, map_fn: F) -> Self { Self { @@ -134,11 +180,13 @@ impl, F: FnMut(I) -> O> MapSender { } } -impl, F: FnMut(I) -> O> Sender for MapSender { +impl, F: FnMut(I) -> O> NonBlockingSender for MapSender { fn try_send(&mut self, event: I) -> Option<()> { self.sender.try_send((self.map_fn)(event)) } +} +impl, F: FnMut(I) -> O> Sender for MapSender { fn send(&mut self, event: I) -> impl Future { self.sender.send((self.map_fn)(event)) } diff --git a/power-policy-service/src/service/consumer.rs b/power-policy-service/src/service/consumer.rs index 77fffd30f..7de5ee0d6 100644 --- a/power-policy-service/src/service/consumer.rs +++ b/power-policy-service/src/service/consumer.rs @@ -117,8 +117,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { if unconstrained_new != self.state.unconstrained { info!("Unconstrained state changed: {:?}", unconstrained_new); self.state.unconstrained = unconstrained_new; - self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained)) - .await; + self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained)); } Ok(()) } @@ -157,8 +156,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { self.broadcast_event(ServiceEvent::ConsumerConnected( connected_consumer.psu, connected_consumer.consumer_power_capability, - )) - .await; + )); Ok(()) } @@ -205,8 +203,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { // so just continue execution. self.disconnect_chargers().await?; - self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)) - .await; + self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)); // Don't update the unconstrained here because this is a transitional state } @@ -249,8 +246,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { // Notify disconnect if recently detached consumer was previously attached. if let Some(current_consumer) = self.state.current_consumer_state { self.disconnect_chargers().await?; - self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)) - .await; + self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)); } // No new consumer available self.state.current_consumer_state = None; diff --git a/power-policy-service/src/service/mod.rs b/power-policy-service/src/service/mod.rs index 587d46d83..28f724220 100644 --- a/power-policy-service/src/service/mod.rs +++ b/power-policy-service/src/service/mod.rs @@ -7,8 +7,9 @@ pub mod provider; pub mod registration; pub mod task; +use embedded_services::error; use embedded_services::named::Named; -use embedded_services::{event::Sender, info, sync::Lockable, trace}; +use embedded_services::{event::NonBlockingSender, info, sync::Lockable, trace}; use power_policy_interface::charger::{Charger, PsuState}; use power_policy_interface::{ @@ -137,9 +138,11 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { } /// Send an event to all registered listeners - async fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) { + fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) { for sender in self.registration.event_senders() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send event to listener"); + } } } diff --git a/power-policy-service/src/service/provider.rs b/power-policy-service/src/service/provider.rs index e41c15d6a..0529fe1a7 100644 --- a/power-policy-service/src/service/provider.rs +++ b/power-policy-service/src/service/provider.rs @@ -96,13 +96,13 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { e } else { locked_requester.connect_provider(target_power).await?; - self.post_provider_connected(requester, target_power).await; + self.post_provider_connected(requester, target_power); Ok(()) } } /// Common logic for after a provider has successfully connected - async fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) { + fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) { if self .state .connected_providers @@ -111,8 +111,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { { error!("Tracked providers set is full"); } - self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power)) - .await; + self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power)); } /// Common logic for when a provider is removed @@ -137,7 +136,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { self.state.current_provider_state.state = PowerState::Unlimited; } - self.broadcast_event(ServiceEvent::ProviderDisconnected(psu)).await; + self.broadcast_event(ServiceEvent::ProviderDisconnected(psu)); true } else { false diff --git a/power-policy-service/src/service/registration.rs b/power-policy-service/src/service/registration.rs index 722c832c7..d6301f413 100644 --- a/power-policy-service/src/service/registration.rs +++ b/power-policy-service/src/service/registration.rs @@ -1,12 +1,12 @@ //! Code related to registration with the power policy service. -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use power_policy_interface::{charger, psu, service::event::Event as ServiceEvent}; /// Registration trait that abstracts over various registration details. pub trait Registration<'device> { type Psu: Lockable + 'device; - type ServiceSender: Sender>; + type ServiceSender: NonBlockingSender>; type Charger: Lockable + 'device; /// Returns a slice to access PSU devices @@ -22,7 +22,7 @@ pub struct ArrayRegistration< 'device, Psu: Lockable + 'device, const PSU_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, Charger: Lockable + 'device, const CHARGER_COUNT: usize, @@ -39,7 +39,7 @@ impl< 'device, Psu: Lockable + 'device, const PSU_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, Charger: Lockable + 'device, const CHARGER_COUNT: usize, diff --git a/power-policy-service/tests/common/mock.rs b/power-policy-service/tests/common/mock.rs index 0b97d0f0c..04191064c 100644 --- a/power-policy-service/tests/common/mock.rs +++ b/power-policy-service/tests/common/mock.rs @@ -2,7 +2,7 @@ #![allow(dead_code)] use embassy_sync::{channel, mutex::Mutex, signal::Signal}; use embedded_batteries_async::charger::{MilliAmps, MilliVolts}; -use embedded_services::{GlobalRawMutex, event::Sender, info, named::Named}; +use embedded_services::{GlobalRawMutex, event::NonBlockingSender, info, named::Named}; use power_policy_interface::{ capability::{ConsumerPowerCapability, PowerCapability, ProviderFlags, ProviderPowerCapability}, charger, @@ -18,7 +18,7 @@ pub enum FnCall { Reset, } -pub struct Mock<'a, S: Sender> { +pub struct Mock<'a, S: NonBlockingSender> { sender: S, fn_call: &'a Signal, // Internal state @@ -26,7 +26,7 @@ pub struct Mock<'a, S: Sender> { name: &'static str, } -impl<'a, S: Sender> Mock<'a, S> { +impl<'a, S: NonBlockingSender> Mock<'a, S> { pub fn new(name: &'static str, sender: S, fn_call: &'a Signal) -> Self { Self { name, @@ -47,21 +47,21 @@ impl<'a, S: Sender> Mock<'a, S> { pub async fn simulate_consumer_connection(&mut self, capability: ConsumerPowerCapability) { self.state.attach().unwrap(); - self.sender.send(EventData::Attached).await; + self.sender.try_send(EventData::Attached).unwrap(); self.state.update_consumer_power_capability(Some(capability)).unwrap(); self.sender - .send(EventData::UpdatedConsumerCapability(Some(capability))) - .await; + .try_send(EventData::UpdatedConsumerCapability(Some(capability))) + .unwrap(); } pub async fn simulate_detach(&mut self) { self.state.detach(); - self.sender.send(EventData::Detached).await; + self.sender.try_send(EventData::Detached).unwrap(); } pub async fn simulate_provider_connection(&mut self, capability: PowerCapability) { self.state.attach().unwrap(); - self.sender.send(EventData::Attached).await; + self.sender.try_send(EventData::Attached).unwrap(); let capability = Some(ProviderPowerCapability { capability, @@ -71,13 +71,13 @@ impl<'a, S: Sender> Mock<'a, S> { .update_requested_provider_power_capability(capability) .unwrap(); self.sender - .send(EventData::RequestedProviderCapability(capability)) - .await; + .try_send(EventData::RequestedProviderCapability(capability)) + .unwrap(); } pub async fn simulate_disconnect(&mut self) { self.state.disconnect(true).unwrap(); - self.sender.send(EventData::Disconnected).await; + self.sender.try_send(EventData::Disconnected).unwrap(); } pub async fn simulate_update_requested_provider_power_capability( @@ -88,12 +88,12 @@ impl<'a, S: Sender> Mock<'a, S> { .update_requested_provider_power_capability(capability) .unwrap(); self.sender - .send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) - .await + .try_send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) + .unwrap(); } } -impl<'a, S: Sender> Psu for Mock<'a, S> { +impl<'a, S: NonBlockingSender> Psu for Mock<'a, S> { async fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> Result<(), Error> { info!("Connect consumer {:#?}", capability); self.record_fn_call(FnCall::ConnectConsumer(capability)); @@ -121,7 +121,7 @@ impl<'a, S: Sender> Psu for Mock<'a, S> { } } -impl<'a, S: Sender> Named for Mock<'a, S> { +impl<'a, S: NonBlockingSender> Named for Mock<'a, S> { fn name(&self) -> &'static str { self.name } @@ -146,7 +146,9 @@ impl<'a> ExampleCharger<'a> { } pub async fn simulate_psu_state_change(&self, psu_state: charger::PsuState) { - self.sender.send(charger::EventData::PsuStateChange(psu_state)).await; + self.sender + .try_send(charger::EventData::PsuStateChange(psu_state)) + .unwrap(); } } diff --git a/thermal-service/src/fan.rs b/thermal-service/src/fan.rs index 3e0b24715..d51e3f400 100644 --- a/thermal-service/src/fan.rs +++ b/thermal-service/src/fan.rs @@ -5,7 +5,7 @@ use embassy_sync::signal::Signal; use embassy_time::{Duration, Timer}; use embedded_fans_async::Error as _; use embedded_sensors_hal_async::temperature::DegreesCelsius; -use embedded_services::event::Sender; +use embedded_services::event::NonBlockingSender; use embedded_services::{GlobalRawMutex, error, trace}; use thermal_service_interface::{fan, sensor}; @@ -102,14 +102,20 @@ impl ServiceInner, const SAMPLE_BUF_LEN: usize> { +pub struct Service< + 'hw, + T: fan::Driver, + S: sensor::SensorService, + E: NonBlockingSender, + const SAMPLE_BUF_LEN: usize, +> { inner: &'hw ServiceInner, _phantom: PhantomData<(S, E)>, } // Note: We can't derive these traits because the compiler thinks our generics then need to be Copy + Clone, // but we only hold a reference and don't actually need to be that strict -impl, const SAMPLE_BUF_LEN: usize> Clone +impl, const SAMPLE_BUF_LEN: usize> Clone for Service<'_, T, S, E, SAMPLE_BUF_LEN> { fn clone(&self) -> Self { @@ -117,13 +123,13 @@ impl, const SAMP } } -impl, const SAMPLE_BUF_LEN: usize> Copy +impl, const SAMPLE_BUF_LEN: usize> Copy for Service<'_, T, S, E, SAMPLE_BUF_LEN> { } -impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const SAMPLE_BUF_LEN: usize> fan::FanService - for Service<'hw, T, S, E, SAMPLE_BUF_LEN> +impl<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> + fan::FanService for Service<'hw, T, S, E, SAMPLE_BUF_LEN> { async fn enable_auto_control(&self) -> Result<(), fan::Error> { self.inner.change_state(fan::State::Off).await?; @@ -222,7 +228,7 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const } /// Parameters required to initialize a fan service. -pub struct InitParams<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender> { +pub struct InitParams<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender> { /// The underlying fan driver this service will control. pub driver: T, /// Initial configuration for the fan service. @@ -247,18 +253,26 @@ impl Default for Resources, const SAMPLE_BUF_LEN: usize> { +pub struct Runner< + 'hw, + T: fan::Driver, + S: sensor::SensorService, + E: NonBlockingSender, + const SAMPLE_BUF_LEN: usize, +> { service: &'hw ServiceInner, sensor: S, event_senders: &'hw mut [E], } -impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, S, E, SAMPLE_BUF_LEN> { - async fn broadcast_event(&mut self, event: fan::Event) { + fn broadcast_event(&mut self, event: fan::Event) { for sender in self.event_senders.iter_mut() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send fan event"); + } } } @@ -350,7 +364,7 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const if let Err(e) = self.handle_fan_state(temp).await { error!("Error handling fan state transition, disabling auto control: {:?}", e); self.service.config.lock().await.auto_control = false; - self.broadcast_event(fan::Event::Failure(e)).await; + self.broadcast_event(fan::Event::Failure(e)); } let sleep_duration = self.service.config.lock().await.update_period; @@ -364,8 +378,13 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const } } -impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> - odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, S, E, SAMPLE_BUF_LEN> +impl< + 'hw, + T: fan::Driver, + S: sensor::SensorService + 'hw, + E: NonBlockingSender + 'hw, + const SAMPLE_BUF_LEN: usize, +> odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, S, E, SAMPLE_BUF_LEN> { async fn run(mut self) -> embedded_services::Never { let service = self.service; @@ -375,8 +394,13 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender } } -impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> - odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, S, E, SAMPLE_BUF_LEN> +impl< + 'hw, + T: fan::Driver, + S: sensor::SensorService + 'hw, + E: NonBlockingSender + 'hw, + const SAMPLE_BUF_LEN: usize, +> odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, S, E, SAMPLE_BUF_LEN> { type Runner = Runner<'hw, T, S, E, SAMPLE_BUF_LEN>; type Resources = Resources; diff --git a/thermal-service/src/sensor.rs b/thermal-service/src/sensor.rs index 9c50e0717..79660905f 100644 --- a/thermal-service/src/sensor.rs +++ b/thermal-service/src/sensor.rs @@ -3,7 +3,7 @@ use core::marker::PhantomData; use embassy_sync::{mutex::Mutex, signal::Signal}; use embassy_time::{Duration, Timer, with_timeout}; use embedded_sensors_hal_async::temperature::DegreesCelsius; -use embedded_services::event::Sender; +use embedded_services::event::NonBlockingSender; use embedded_services::{GlobalRawMutex, error}; use thermal_service_interface::sensor; @@ -102,14 +102,14 @@ impl ServiceInner, const SAMPLE_BUF_LEN: usize> { +pub struct Service<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> { inner: &'hw ServiceInner, _phantom: PhantomData, } // Note: We can't derive these traits because the compiler thinks our generics then need to be Copy + Clone, // but we only hold a reference and don't actually need to be that strict -impl, const SAMPLE_BUF_LEN: usize> Clone +impl, const SAMPLE_BUF_LEN: usize> Clone for Service<'_, T, E, SAMPLE_BUF_LEN> { fn clone(&self) -> Self { @@ -117,12 +117,12 @@ impl, const SAMPLE_BUF_LEN: usize> C } } -impl, const SAMPLE_BUF_LEN: usize> Copy +impl, const SAMPLE_BUF_LEN: usize> Copy for Service<'_, T, E, SAMPLE_BUF_LEN> { } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> sensor::SensorService +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> sensor::SensorService for Service<'hw, T, E, SAMPLE_BUF_LEN> { async fn temperature(&self) -> DegreesCelsius { @@ -172,7 +172,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi } /// Parameters required to initialize a sensor service. -pub struct InitParams<'hw, T: sensor::Driver, E: Sender> { +pub struct InitParams<'hw, T: sensor::Driver, E: NonBlockingSender> { /// The underlying sensor driver this service will control. pub driver: T, /// Initial configuration for the sensor service. @@ -205,16 +205,20 @@ struct State { } /// A task runner for a sensor. Users must run this in an embassy task or similar async execution context. -pub struct Runner<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> { +pub struct Runner<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> { service: &'hw ServiceInner, event_senders: &'hw mut [E], state: State, } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, E, SAMPLE_BUF_LEN> { - async fn broadcast_event(&mut self, event: sensor::Event) { +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> + Runner<'hw, T, E, SAMPLE_BUF_LEN> +{ + fn broadcast_event(&mut self, event: sensor::Event) { for sender in self.event_senders.iter_mut() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send sensor event"); + } } } @@ -223,47 +227,39 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi if temp >= config.warn_high_threshold && !self.state.is_warn_high { self.state.is_warn_high = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnHigh)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnHigh)); } else if temp < (config.warn_high_threshold - config.hysteresis) && self.state.is_warn_high { self.state.is_warn_high = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnHigh)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnHigh)); } if temp <= config.warn_low_threshold && !self.state.is_warn_low { self.state.is_warn_low = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnLow)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnLow)); } else if temp > (config.warn_low_threshold + config.hysteresis) && self.state.is_warn_low { self.state.is_warn_low = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnLow)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnLow)); } if temp >= config.prochot_threshold && !self.state.is_prochot { self.state.is_prochot = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Prochot)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Prochot)); } else if temp < (config.prochot_threshold - config.hysteresis) && self.state.is_prochot { self.state.is_prochot = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Prochot)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Prochot)); } if temp >= config.critical_threshold && !self.state.is_critical { self.state.is_critical = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Critical)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Critical)); } else if temp < (config.critical_threshold - config.hysteresis) && self.state.is_critical { self.state.is_critical = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Critical)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Critical)); } } } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, E, SAMPLE_BUF_LEN> { async fn run(mut self) -> embedded_services::Never { @@ -276,7 +272,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi Ok(temp) => temp, Err(e) => { self.service.config.lock().await.sampling_enabled = false; - self.broadcast_event(sensor::Event::Failure(e)).await; + self.broadcast_event(sensor::Event::Failure(e)); error!("Error sampling sensor, disabling sampling"); continue; } @@ -309,7 +305,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi } } -impl<'hw, T: sensor::Driver, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: sensor::Driver, E: NonBlockingSender + 'hw, const SAMPLE_BUF_LEN: usize> odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, E, SAMPLE_BUF_LEN> { type Runner = Runner<'hw, T, E, SAMPLE_BUF_LEN>; diff --git a/type-c-service/src/controller/electrical_disconnect.rs b/type-c-service/src/controller/electrical_disconnect.rs index e4f92ffd4..a6a4fe0f1 100644 --- a/type-c-service/src/controller/electrical_disconnect.rs +++ b/type-c-service/src/controller/electrical_disconnect.rs @@ -1,7 +1,7 @@ //! Electrical disconnect port trait implementation use core::num::NonZeroU8; -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::controller::electrical_disconnect::ElectricalDisconnect; @@ -12,9 +12,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::electrical_disconnect::ElectricalDisconnect for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/event_receiver.rs b/type-c-service/src/controller/event_receiver.rs index 8a17f93a0..53983c78d 100644 --- a/type-c-service/src/controller/event_receiver.rs +++ b/type-c-service/src/controller/event_receiver.rs @@ -3,7 +3,8 @@ use core::array; use core::future::pending; use embassy_futures::select::{Either, select}; use embassy_time::Timer; -use embedded_services::event::{Receiver, Sender}; +use embedded_services::error; +use embedded_services::event::{NonBlockingSender, Receiver}; use embedded_services::sync::Lockable; use crate::PortEventStreamer; @@ -18,12 +19,12 @@ pub trait InterruptReceiver { } /// Struct to send received interrupts to their corresponding port receivers -pub struct PortEventSplitter> { +pub struct PortEventSplitter> { /// Senders to forward port events to their corresponding port receivers sender: [S; N], } -impl> PortEventSplitter { +impl> PortEventSplitter { /// Create a new instance pub fn new(sender: [S; N]) -> Self { Self { sender } @@ -32,8 +33,8 @@ impl> PortEventSplitter { /// Wait for the next interrupt event and forward it to the corresponding port receiver. pub async fn process_interrupts(&mut self, interrupts: [PortEventBitfield; N]) { for (interrupt, sender) in interrupts.into_iter().zip(self.sender.iter_mut()) { - if interrupt != PortEventBitfield::none() { - sender.send(interrupt).await; + if interrupt != PortEventBitfield::none() && sender.try_send(interrupt).is_none() { + error!("Failed to send port event"); } } } diff --git a/type-c-service/src/controller/macros.rs b/type-c-service/src/controller/macros.rs index 0fe30d216..90c4383e1 100644 --- a/type-c-service/src/controller/macros.rs +++ b/type-c-service/src/controller/macros.rs @@ -1,5 +1,5 @@ use embedded_services::{ - event::{Receiver, Sender}, + event::{NonBlockingSender, Receiver}, sync::Lockable, }; use type_c_interface::port::event::PortEventBitfield; @@ -20,7 +20,7 @@ pub struct PortComponents< PowerPolicyReceveiver: Receiver, LoopbackReceiver: Receiver, InterruptReceiver: Receiver, - InterruptSender: Sender, + InterruptSender: NonBlockingSender, > { /// Port instance pub port: &'a Port, diff --git a/type-c-service/src/controller/max_sink_voltage.rs b/type-c-service/src/controller/max_sink_voltage.rs index 37609c348..d4e259231 100644 --- a/type-c-service/src/controller/max_sink_voltage.rs +++ b/type-c-service/src/controller/max_sink_voltage.rs @@ -1,5 +1,5 @@ //! Max sink voltage port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::controller::max_sink_voltage::MaxSinkVoltage; @@ -10,9 +10,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::max_sink_voltage::MaxSinkVoltage for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/mod.rs b/type-c-service/src/controller/mod.rs index a767bcc78..c458eb3c7 100644 --- a/type-c-service/src/controller/mod.rs +++ b/type-c-service/src/controller/mod.rs @@ -1,5 +1,5 @@ //! Struct that manages per-port state, interfacing with a controller object that exposes multiple ports. -use embedded_services::{debug, error, event::Sender, info, named::Named, sync::Lockable}; +use embedded_services::{debug, error, event::NonBlockingSender, info, named::Named, sync::Lockable}; use embedded_usb_pd::{LocalPortId, PdError}; use power_policy_interface::psu::PsuState; use type_c_interface::control::pd::PortStatus; @@ -28,9 +28,9 @@ pub struct Port< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > { /// Local port port: LocalPortId, @@ -58,9 +58,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Create new Port instance @@ -149,7 +149,9 @@ impl< current_status: new_status, }); self.status = new_status; - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send port status type-C event"); + } Ok(event) } @@ -169,15 +171,23 @@ impl< return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::Attached) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::Attached) + .is_none() + { + error!("Failed to send power policy event"); + } } else { info!("Plug removed"); self.psu_state.detach(); - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::Detached) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::Detached) + .is_none() + { + error!("Failed to send power policy event"); + } } Ok(()) @@ -207,8 +217,8 @@ impl< event.status.set_new_power_contract_as_provider(true); } - if event != PortEventBitfield::none() { - self.loopback_sender.send(Loopback::PortEvent(event)).await; + if event != PortEventBitfield::none() && self.loopback_sender.try_send(Loopback::PortEvent(event)).is_none() { + error!("Failed to send loopback event"); } Ok(()) } @@ -218,9 +228,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Named for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { fn name(&self) -> &'static str { diff --git a/type-c-service/src/controller/pd.rs b/type-c-service/src/controller/pd.rs index 736d8cc5d..a1532cbc6 100644 --- a/type-c-service/src/controller/pd.rs +++ b/type-c-service/src/controller/pd.rs @@ -1,5 +1,5 @@ //! PD functionality unrelated to power contracts and general port status -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use embedded_usb_pd::ado::Ado; use embedded_usb_pd::vdm::structured::command::discover_identity::{sop, sop_prime}; @@ -22,9 +22,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Process a VDM event by retrieving the relevant VDM data from the `controller` for the appropriate `port`. @@ -48,7 +48,9 @@ impl< }; let event = ServicePortEventData::Vdm(vdm_data); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send VDM type-C event"); + } Ok(Some(event)) } @@ -57,7 +59,9 @@ impl< debug!("({}): Processing DP status update event", self.name); let status = self.controller.lock().await.get_dp_status(self.port).await?; let event = ServicePortEventData::DpStatusUpdate(status); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send DP status update type-C event"); + } Ok(event) } @@ -66,7 +70,9 @@ impl< debug!("({}): PD alert: {:#?}", self.name, ado); if let Some(ado) = ado { let event = ServicePortEventData::Alert(ado); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send PD alert type-C event"); + } Ok(Some(event)) } else { // For some reason we didn't read an alert, nothing to do @@ -79,9 +85,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::pd::Pd for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn get_port_status(&mut self) -> Result { @@ -169,9 +175,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::pd::StateMachine for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn set_pd_state_machine_config(&mut self, config: PdStateMachineConfig) -> Result<(), PdError> { diff --git a/type-c-service/src/controller/power.rs b/type-c-service/src/controller/power.rs index 8d48d3817..84ceb8add 100644 --- a/type-c-service/src/controller/power.rs +++ b/type-c-service/src/controller/power.rs @@ -1,6 +1,6 @@ //! Module for power policy related functionality use embassy_time::{Duration, Instant}; -use embedded_services::{debug, error, event::Sender, info, sync::Lockable}; +use embedded_services::{debug, error, event::NonBlockingSender, info, sync::Lockable}; use embedded_usb_pd::{ PdError, constants::{T_PS_TRANSITION_EPR_MS, T_PS_TRANSITION_SPR_MS}, @@ -19,9 +19,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Handle a new contract as consumer @@ -43,9 +43,13 @@ impl< error!("Failed to update consumer power capability: {:?}", e); return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::UpdatedConsumerCapability(available_sink_contract)) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::UpdatedConsumerCapability(available_sink_contract)) + .is_none() + { + error!("Failed to send updated consumer capability event"); + } Ok(()) } @@ -61,9 +65,13 @@ impl< error!("Failed to update requested provider power capability: {:?}", e); return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) + .is_none() + { + error!("Failed to send requested provider capability event"); + } Ok(()) } @@ -115,9 +123,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Psu for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn disconnect(&mut self) -> Result<(), PsuError> { @@ -171,9 +179,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::power::SystemPowerStateStatus for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/retimer.rs b/type-c-service/src/controller/retimer.rs index e55fc4fa7..dc5ed4ed7 100644 --- a/type-c-service/src/controller/retimer.rs +++ b/type-c-service/src/controller/retimer.rs @@ -1,5 +1,5 @@ //! Retimer port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::control::retimer::RetimerFwUpdateState; use type_c_interface::controller::retimer::Retimer; @@ -11,9 +11,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::retimer::Retimer for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn get_rt_fw_update_status(&mut self) -> Result { diff --git a/type-c-service/src/controller/type_c.rs b/type-c-service/src/controller/type_c.rs index 54e75bdc3..ea1e2e75f 100644 --- a/type-c-service/src/controller/type_c.rs +++ b/type-c-service/src/controller/type_c.rs @@ -1,5 +1,5 @@ //! Type-C state machine port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::control::type_c::TypeCStateMachineState; use type_c_interface::controller::type_c::StateMachine; @@ -11,9 +11,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::type_c::StateMachine for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn set_type_c_state_machine_config(&mut self, state: TypeCStateMachineState) -> Result<(), PdError> { diff --git a/type-c-service/src/controller/ucsi.rs b/type-c-service/src/controller/ucsi.rs index 3dfdbf6dd..ec786db4c 100644 --- a/type-c-service/src/controller/ucsi.rs +++ b/type-c-service/src/controller/ucsi.rs @@ -1,5 +1,5 @@ //! UCSI LPM port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::{PdError, ucsi::lpm}; use type_c_interface::ucsi::Lpm as UcsiLpm; @@ -10,9 +10,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::ucsi::Lpm for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn execute_lpm_command(&mut self, command: lpm::LocalCommand) -> Result, PdError> { diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index b794b3a6a..75db4b484 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -1,7 +1,7 @@ use core::marker::PhantomData; use core::ptr; -use embedded_services::event::Sender as _; +use embedded_services::event::NonBlockingSender as _; use embedded_services::named::Named as _; use embedded_services::sync::Lockable; use embedded_services::{debug, error, info, trace}; @@ -75,9 +75,11 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { } /// Send an event to all registered listeners - async fn broadcast_event(&mut self, event: ServiceEvent<'port, Reg::Port>) { + fn broadcast_event(&mut self, event: ServiceEvent<'port, Reg::Port>) { for sender in self.registration.event_senders() { - sender.send(event.clone()).await; + if sender.try_send(event.clone()).is_none() { + error!("Failed to send event to listener"); + } } } @@ -109,8 +111,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { event: EventData::DebugAccessory(DebugAccessoryData { connected: new_status.is_connected(), }), - }) - .await; + }); } self.handle_ucsi_port_event(port, GlobalPortId(self.get_port_index(port)? as u8), event, &new_status) diff --git a/type-c-service/src/service/registration.rs b/type-c-service/src/service/registration.rs index 815b151cb..654736f55 100644 --- a/type-c-service/src/service/registration.rs +++ b/type-c-service/src/service/registration.rs @@ -1,6 +1,6 @@ //! Code related to registration with the type-C service -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::{GlobalPortId, LocalPortId}; use type_c_interface::port::pd::Pd; use type_c_interface::service::event::Event as ServiceEvent; @@ -9,7 +9,7 @@ use type_c_interface::ucsi::Lpm as UcsiLpm; /// Registration trait that abstracts over various registration details. pub trait Registration<'port> { type Port: Lockable + 'port; - type ServiceSender: Sender>; + type ServiceSender: NonBlockingSender>; /// Returns a slice to access ports fn ports(&self) -> &[&'port Self::Port]; @@ -29,7 +29,7 @@ pub struct ArrayRegistration< 'port, Port: Lockable + 'port, const PORT_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, > { /// Array of registered ports @@ -44,7 +44,7 @@ impl< 'port, Port: Lockable + 'port, const PORT_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, > Registration<'port> for ArrayRegistration<'port, Port, PORT_COUNT, ServiceSender, SERVICE_SENDER_COUNT> { diff --git a/type-c-service/src/service/ucsi.rs b/type-c-service/src/service/ucsi.rs index 6004d2fd1..c79966327 100644 --- a/type-c-service/src/service/ucsi.rs +++ b/type-c-service/src/service/ucsi.rs @@ -187,8 +187,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { // False here because the OPM gets notified by the CCI, don't need a separate notification notify_opm: false, }), - }) - .await; + }); self.set_cci_connector_change(cci); } @@ -388,8 +387,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { port: port_id, notify_opm, }), - }) - .await; + }); } else { // This shouldn't happen because we have a single slot per port // Would likely indicate that an invalid port ID got in somehow diff --git a/type-c-service/tests/common/mod.rs b/type-c-service/tests/common/mod.rs index 5dbcfe27b..c5d8beef8 100644 --- a/type-c-service/tests/common/mod.rs +++ b/type-c-service/tests/common/mod.rs @@ -1,7 +1,7 @@ use std::mem::ManuallyDrop; use embassy_futures::{ - join::{join, join3}, + join::join3, select::{Either, select}, }; use embassy_sync::{ @@ -11,7 +11,7 @@ use embassy_sync::{ watch, }; use embassy_time::{Duration, with_timeout}; -use embedded_services::{GlobalRawMutex, event::Sender}; +use embedded_services::{GlobalRawMutex, event::NonBlockingSender}; use embedded_usb_pd::LocalPortId; use paste::paste; use power_policy_interface::charger::mock::NoopCharger; @@ -198,7 +198,7 @@ pub struct PowerPolicyServiceEventRouter<'port, 'ch> { type_c_sender: DynamicSender<'ch, power_policy_interface::service::event::EventData>, } -impl<'port, 'ch> Sender>> +impl<'port, 'ch> NonBlockingSender>> for PowerPolicyServiceEventRouter<'port, 'ch> { fn try_send( @@ -208,10 +208,6 @@ impl<'port, 'ch> Sender>) { - join(self.test_sender.send(event), self.type_c_sender.send(event.into())).await; - } } /// Power policy event loop task