From 3bf9dc34ee12600e6c04b3a9e192aa0bbcffe681 Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Tue, 26 May 2026 10:28:16 -0700 Subject: [PATCH 1/2] embedded-service/event: Break up existing `Sender` trait Break-out blocking send function into `BlockingSender` trait. Introduce `Immediate` wrapper to distinguish receivers that can implement some sort of recovery flow if events are dropped. --- embedded-service/src/event.rs | 58 +++++++++++++++++-- power-policy-service/src/service/consumer.rs | 12 ++-- power-policy-service/src/service/mod.rs | 7 ++- power-policy-service/src/service/provider.rs | 9 ++- power-policy-service/tests/common/mock.rs | 24 ++++---- thermal-service/src/fan.rs | 8 ++- thermal-service/src/sensor.rs | 32 +++++----- .../src/controller/event_receiver.rs | 5 +- type-c-service/src/controller/mod.rs | 28 ++++++--- type-c-service/src/controller/pd.rs | 12 +++- type-c-service/src/controller/power.rs | 20 +++++-- type-c-service/src/service/mod.rs | 9 +-- type-c-service/src/service/ucsi.rs | 6 +- 13 files changed, 149 insertions(+), 81 deletions(-) diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index a24736761..8bb779429 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -1,21 +1,27 @@ //! Common traits for event senders and receivers -use core::{future::ready, marker::PhantomData}; +use core::marker::PhantomData; use crate::error; use embassy_sync::{ blocking_mutex::raw::RawMutex, channel::{DynamicReceiver, DynamicSender, Receiver as ChannelReceiver, Sender as ChannelSender}, - pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, + pubsub::{DynImmediatePublisher, DynPublisher, DynSubscriber, WaitResult}, }; /// Common event sender trait pub trait Sender { /// Attempt to send an event /// - /// Return none if the event cannot currently be sent + /// Return none if sending the event would block. fn try_send(&mut self, event: E) -> Option<()>; +} + +/// A sender that can block +pub trait BlockingSender: Sender { /// Send an event + /// + /// This blocks if the event cannot be sent immediately. fn send(&mut self, event: E) -> impl Future; } @@ -29,11 +35,22 @@ pub trait Receiver { fn wait_next(&mut self) -> impl Future; } +/// Enum for receivers that can receive immediate events +#[derive(Clone)] +pub enum ImmediateEvent { + /// Event + Event(E), + /// Lagged events + Lagged(u64), +} + impl Sender for DynamicSender<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { DynamicSender::try_send(self, event).ok() } +} +impl BlockingSender for DynamicSender<'_, E> { fn send(&mut self, event: E) -> impl Future { DynamicSender::send(self, event) } @@ -50,13 +67,21 @@ impl Receiver for DynamicReceiver<'_, E> { } impl Sender for DynImmediatePublisher<'_, E> { + fn try_send(&mut self, event: E) -> Option<()> { + self.publish_immediate(event); + Some(()) + } +} + +impl Sender for DynPublisher<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { self.try_publish(event).ok() } +} +impl BlockingSender for DynPublisher<'_, E> { fn send(&mut self, event: E) -> impl Future { - self.publish_immediate(event); - ready(()) + self.publish(event) } } @@ -85,11 +110,30 @@ impl Receiver for DynSubscriber<'_, E> { } } +impl Receiver> for DynSubscriber<'_, E> { + fn try_next(&mut self) -> Option> { + match self.try_next_message() { + Some(WaitResult::Message(e)) => Some(ImmediateEvent::Event(e)), + Some(WaitResult::Lagged(e)) => Some(ImmediateEvent::Lagged(e)), + _ => None, + } + } + + async fn wait_next(&mut self) -> ImmediateEvent { + match self.next_message().await { + WaitResult::Message(e) => ImmediateEvent::Event(e), + WaitResult::Lagged(e) => ImmediateEvent::Lagged(e), + } + } +} + impl Sender for ChannelSender<'_, M, E, N> { fn try_send(&mut self, event: E) -> Option<()> { ChannelSender::try_send(self, event).ok() } +} +impl BlockingSender for ChannelSender<'_, M, E, N> { fn send(&mut self, event: E) -> impl Future { ChannelSender::send(self, event) } @@ -112,7 +156,9 @@ impl Sender for NoopSender { fn try_send(&mut self, _event: E) -> Option<()> { Some(()) } +} +impl BlockingSender for NoopSender { async fn send(&mut self, _event: E) {} } @@ -138,7 +184,9 @@ impl, F: FnMut(I) -> O> Sender for MapSender { fn try_send(&mut self, event: I) -> Option<()> { self.sender.try_send((self.map_fn)(event)) } +} +impl, F: FnMut(I) -> O> BlockingSender for MapSender { fn send(&mut self, event: I) -> impl Future { self.sender.send((self.map_fn)(event)) } diff --git a/power-policy-service/src/service/consumer.rs b/power-policy-service/src/service/consumer.rs index 77fffd30f..7de5ee0d6 100644 --- a/power-policy-service/src/service/consumer.rs +++ b/power-policy-service/src/service/consumer.rs @@ -117,8 +117,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { if unconstrained_new != self.state.unconstrained { info!("Unconstrained state changed: {:?}", unconstrained_new); self.state.unconstrained = unconstrained_new; - self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained)) - .await; + self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained)); } Ok(()) } @@ -157,8 +156,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { self.broadcast_event(ServiceEvent::ConsumerConnected( connected_consumer.psu, connected_consumer.consumer_power_capability, - )) - .await; + )); Ok(()) } @@ -205,8 +203,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { // so just continue execution. self.disconnect_chargers().await?; - self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)) - .await; + self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)); // Don't update the unconstrained here because this is a transitional state } @@ -249,8 +246,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { // Notify disconnect if recently detached consumer was previously attached. if let Some(current_consumer) = self.state.current_consumer_state { self.disconnect_chargers().await?; - self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)) - .await; + self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu)); } // No new consumer available self.state.current_consumer_state = None; diff --git a/power-policy-service/src/service/mod.rs b/power-policy-service/src/service/mod.rs index 587d46d83..e21bd0951 100644 --- a/power-policy-service/src/service/mod.rs +++ b/power-policy-service/src/service/mod.rs @@ -7,6 +7,7 @@ pub mod provider; pub mod registration; pub mod task; +use embedded_services::error; use embedded_services::named::Named; use embedded_services::{event::Sender, info, sync::Lockable, trace}; @@ -137,9 +138,11 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { } /// Send an event to all registered listeners - async fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) { + fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) { for sender in self.registration.event_senders() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send event to listener"); + } } } diff --git a/power-policy-service/src/service/provider.rs b/power-policy-service/src/service/provider.rs index e41c15d6a..0529fe1a7 100644 --- a/power-policy-service/src/service/provider.rs +++ b/power-policy-service/src/service/provider.rs @@ -96,13 +96,13 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { e } else { locked_requester.connect_provider(target_power).await?; - self.post_provider_connected(requester, target_power).await; + self.post_provider_connected(requester, target_power); Ok(()) } } /// Common logic for after a provider has successfully connected - async fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) { + fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) { if self .state .connected_providers @@ -111,8 +111,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { { error!("Tracked providers set is full"); } - self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power)) - .await; + self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power)); } /// Common logic for when a provider is removed @@ -137,7 +136,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> { self.state.current_provider_state.state = PowerState::Unlimited; } - self.broadcast_event(ServiceEvent::ProviderDisconnected(psu)).await; + self.broadcast_event(ServiceEvent::ProviderDisconnected(psu)); true } else { false diff --git a/power-policy-service/tests/common/mock.rs b/power-policy-service/tests/common/mock.rs index 0b97d0f0c..627a7ff02 100644 --- a/power-policy-service/tests/common/mock.rs +++ b/power-policy-service/tests/common/mock.rs @@ -47,21 +47,21 @@ impl<'a, S: Sender> Mock<'a, S> { pub async fn simulate_consumer_connection(&mut self, capability: ConsumerPowerCapability) { self.state.attach().unwrap(); - self.sender.send(EventData::Attached).await; + self.sender.try_send(EventData::Attached).unwrap(); self.state.update_consumer_power_capability(Some(capability)).unwrap(); self.sender - .send(EventData::UpdatedConsumerCapability(Some(capability))) - .await; + .try_send(EventData::UpdatedConsumerCapability(Some(capability))) + .unwrap(); } pub async fn simulate_detach(&mut self) { self.state.detach(); - self.sender.send(EventData::Detached).await; + self.sender.try_send(EventData::Detached).unwrap(); } pub async fn simulate_provider_connection(&mut self, capability: PowerCapability) { self.state.attach().unwrap(); - self.sender.send(EventData::Attached).await; + self.sender.try_send(EventData::Attached).unwrap(); let capability = Some(ProviderPowerCapability { capability, @@ -71,13 +71,13 @@ impl<'a, S: Sender> Mock<'a, S> { .update_requested_provider_power_capability(capability) .unwrap(); self.sender - .send(EventData::RequestedProviderCapability(capability)) - .await; + .try_send(EventData::RequestedProviderCapability(capability)) + .unwrap(); } pub async fn simulate_disconnect(&mut self) { self.state.disconnect(true).unwrap(); - self.sender.send(EventData::Disconnected).await; + self.sender.try_send(EventData::Disconnected).unwrap(); } pub async fn simulate_update_requested_provider_power_capability( @@ -88,8 +88,8 @@ impl<'a, S: Sender> Mock<'a, S> { .update_requested_provider_power_capability(capability) .unwrap(); self.sender - .send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) - .await + .try_send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) + .unwrap(); } } @@ -146,7 +146,9 @@ impl<'a> ExampleCharger<'a> { } pub async fn simulate_psu_state_change(&self, psu_state: charger::PsuState) { - self.sender.send(charger::EventData::PsuStateChange(psu_state)).await; + self.sender + .try_send(charger::EventData::PsuStateChange(psu_state)) + .unwrap(); } } diff --git a/thermal-service/src/fan.rs b/thermal-service/src/fan.rs index 3e0b24715..a4a04f9be 100644 --- a/thermal-service/src/fan.rs +++ b/thermal-service/src/fan.rs @@ -256,9 +256,11 @@ pub struct Runner<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, S, E, SAMPLE_BUF_LEN> { - async fn broadcast_event(&mut self, event: fan::Event) { + fn broadcast_event(&mut self, event: fan::Event) { for sender in self.event_senders.iter_mut() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send fan event"); + } } } @@ -350,7 +352,7 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const if let Err(e) = self.handle_fan_state(temp).await { error!("Error handling fan state transition, disabling auto control: {:?}", e); self.service.config.lock().await.auto_control = false; - self.broadcast_event(fan::Event::Failure(e)).await; + self.broadcast_event(fan::Event::Failure(e)); } let sleep_duration = self.service.config.lock().await.update_period; diff --git a/thermal-service/src/sensor.rs b/thermal-service/src/sensor.rs index 9c50e0717..3307fed06 100644 --- a/thermal-service/src/sensor.rs +++ b/thermal-service/src/sensor.rs @@ -212,9 +212,11 @@ pub struct Runner<'hw, T: sensor::Driver, E: Sender, const SAMPLE } impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, E, SAMPLE_BUF_LEN> { - async fn broadcast_event(&mut self, event: sensor::Event) { + fn broadcast_event(&mut self, event: sensor::Event) { for sender in self.event_senders.iter_mut() { - sender.send(event).await; + if sender.try_send(event).is_none() { + error!("Failed to send sensor event"); + } } } @@ -223,42 +225,34 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi if temp >= config.warn_high_threshold && !self.state.is_warn_high { self.state.is_warn_high = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnHigh)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnHigh)); } else if temp < (config.warn_high_threshold - config.hysteresis) && self.state.is_warn_high { self.state.is_warn_high = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnHigh)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnHigh)); } if temp <= config.warn_low_threshold && !self.state.is_warn_low { self.state.is_warn_low = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnLow)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::WarnLow)); } else if temp > (config.warn_low_threshold + config.hysteresis) && self.state.is_warn_low { self.state.is_warn_low = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnLow)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::WarnLow)); } if temp >= config.prochot_threshold && !self.state.is_prochot { self.state.is_prochot = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Prochot)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Prochot)); } else if temp < (config.prochot_threshold - config.hysteresis) && self.state.is_prochot { self.state.is_prochot = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Prochot)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Prochot)); } if temp >= config.critical_threshold && !self.state.is_critical { self.state.is_critical = true; - self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Critical)) - .await; + self.broadcast_event(sensor::Event::ThresholdExceeded(sensor::Threshold::Critical)); } else if temp < (config.critical_threshold - config.hysteresis) && self.state.is_critical { self.state.is_critical = false; - self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Critical)) - .await; + self.broadcast_event(sensor::Event::ThresholdCleared(sensor::Threshold::Critical)); } } } @@ -276,7 +270,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi Ok(temp) => temp, Err(e) => { self.service.config.lock().await.sampling_enabled = false; - self.broadcast_event(sensor::Event::Failure(e)).await; + self.broadcast_event(sensor::Event::Failure(e)); error!("Error sampling sensor, disabling sampling"); continue; } diff --git a/type-c-service/src/controller/event_receiver.rs b/type-c-service/src/controller/event_receiver.rs index 8a17f93a0..69825b581 100644 --- a/type-c-service/src/controller/event_receiver.rs +++ b/type-c-service/src/controller/event_receiver.rs @@ -3,6 +3,7 @@ use core::array; use core::future::pending; use embassy_futures::select::{Either, select}; use embassy_time::Timer; +use embedded_services::error; use embedded_services::event::{Receiver, Sender}; use embedded_services::sync::Lockable; @@ -32,8 +33,8 @@ impl> PortEventSplitter { /// Wait for the next interrupt event and forward it to the corresponding port receiver. pub async fn process_interrupts(&mut self, interrupts: [PortEventBitfield; N]) { for (interrupt, sender) in interrupts.into_iter().zip(self.sender.iter_mut()) { - if interrupt != PortEventBitfield::none() { - sender.send(interrupt).await; + if interrupt != PortEventBitfield::none() && sender.try_send(interrupt).is_none() { + error!("Failed to send port event"); } } } diff --git a/type-c-service/src/controller/mod.rs b/type-c-service/src/controller/mod.rs index a767bcc78..d8be4d342 100644 --- a/type-c-service/src/controller/mod.rs +++ b/type-c-service/src/controller/mod.rs @@ -149,7 +149,9 @@ impl< current_status: new_status, }); self.status = new_status; - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send port status type-C event"); + } Ok(event) } @@ -169,15 +171,23 @@ impl< return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::Attached) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::Attached) + .is_none() + { + error!("Failed to send power policy event"); + } } else { info!("Plug removed"); self.psu_state.detach(); - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::Detached) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::Detached) + .is_none() + { + error!("Failed to send power policy event"); + } } Ok(()) @@ -207,8 +217,8 @@ impl< event.status.set_new_power_contract_as_provider(true); } - if event != PortEventBitfield::none() { - self.loopback_sender.send(Loopback::PortEvent(event)).await; + if event != PortEventBitfield::none() && self.loopback_sender.try_send(Loopback::PortEvent(event)).is_none() { + error!("Failed to send loopback event"); } Ok(()) } diff --git a/type-c-service/src/controller/pd.rs b/type-c-service/src/controller/pd.rs index 736d8cc5d..ea84354ef 100644 --- a/type-c-service/src/controller/pd.rs +++ b/type-c-service/src/controller/pd.rs @@ -48,7 +48,9 @@ impl< }; let event = ServicePortEventData::Vdm(vdm_data); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send VDM type-C event"); + } Ok(Some(event)) } @@ -57,7 +59,9 @@ impl< debug!("({}): Processing DP status update event", self.name); let status = self.controller.lock().await.get_dp_status(self.port).await?; let event = ServicePortEventData::DpStatusUpdate(status); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send DP status update type-C event"); + } Ok(event) } @@ -66,7 +70,9 @@ impl< debug!("({}): PD alert: {:#?}", self.name, ado); if let Some(ado) = ado { let event = ServicePortEventData::Alert(ado); - self.type_c_sender.send(event).await; + if self.type_c_sender.try_send(event).is_none() { + error!("Failed to send PD alert type-C event"); + } Ok(Some(event)) } else { // For some reason we didn't read an alert, nothing to do diff --git a/type-c-service/src/controller/power.rs b/type-c-service/src/controller/power.rs index 8d48d3817..0d7a43e46 100644 --- a/type-c-service/src/controller/power.rs +++ b/type-c-service/src/controller/power.rs @@ -43,9 +43,13 @@ impl< error!("Failed to update consumer power capability: {:?}", e); return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::UpdatedConsumerCapability(available_sink_contract)) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::UpdatedConsumerCapability(available_sink_contract)) + .is_none() + { + error!("Failed to send updated consumer capability event"); + } Ok(()) } @@ -61,9 +65,13 @@ impl< error!("Failed to update requested provider power capability: {:?}", e); return Err(PdError::Failed); } - self.power_policy_sender - .send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) - .await; + if self + .power_policy_sender + .try_send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability)) + .is_none() + { + error!("Failed to send requested provider capability event"); + } Ok(()) } diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index b794b3a6a..566a3f22a 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -75,9 +75,11 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { } /// Send an event to all registered listeners - async fn broadcast_event(&mut self, event: ServiceEvent<'port, Reg::Port>) { + fn broadcast_event(&mut self, event: ServiceEvent<'port, Reg::Port>) { for sender in self.registration.event_senders() { - sender.send(event.clone()).await; + if sender.try_send(event.clone()).is_none() { + error!("Failed to send event to listener"); + } } } @@ -109,8 +111,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { event: EventData::DebugAccessory(DebugAccessoryData { connected: new_status.is_connected(), }), - }) - .await; + }); } self.handle_ucsi_port_event(port, GlobalPortId(self.get_port_index(port)? as u8), event, &new_status) diff --git a/type-c-service/src/service/ucsi.rs b/type-c-service/src/service/ucsi.rs index 6004d2fd1..c79966327 100644 --- a/type-c-service/src/service/ucsi.rs +++ b/type-c-service/src/service/ucsi.rs @@ -187,8 +187,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { // False here because the OPM gets notified by the CCI, don't need a separate notification notify_opm: false, }), - }) - .await; + }); self.set_cci_connector_change(cci); } @@ -388,8 +387,7 @@ impl<'port, Reg: Registration<'port>> Service<'port, Reg> { port: port_id, notify_opm, }), - }) - .await; + }); } else { // This shouldn't happen because we have a single slot per port // Would likely indicate that an invalid port ID got in somehow From 85833b34cf076d786bdf460557cc2f34053758af Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Tue, 26 May 2026 13:20:19 -0700 Subject: [PATCH 2/2] Rename traits --- embedded-service/src/event.rs | 30 ++++++------ power-policy-service/src/service/mod.rs | 2 +- .../src/service/registration.rs | 8 ++-- power-policy-service/tests/common/mock.rs | 10 ++-- thermal-service/src/fan.rs | 48 ++++++++++++++----- thermal-service/src/sensor.rs | 22 +++++---- .../src/controller/electrical_disconnect.rs | 8 ++-- .../src/controller/event_receiver.rs | 6 +-- type-c-service/src/controller/macros.rs | 4 +- .../src/controller/max_sink_voltage.rs | 8 ++-- type-c-service/src/controller/mod.rs | 20 ++++---- type-c-service/src/controller/pd.rs | 20 ++++---- type-c-service/src/controller/power.rs | 20 ++++---- type-c-service/src/controller/retimer.rs | 8 ++-- type-c-service/src/controller/type_c.rs | 8 ++-- type-c-service/src/controller/ucsi.rs | 8 ++-- type-c-service/src/service/mod.rs | 2 +- type-c-service/src/service/registration.rs | 8 ++-- type-c-service/tests/common/mod.rs | 10 ++-- 19 files changed, 135 insertions(+), 115 deletions(-) diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index 8bb779429..a5b54bdff 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -10,7 +10,7 @@ use embassy_sync::{ }; /// Common event sender trait -pub trait Sender { +pub trait NonBlockingSender { /// Attempt to send an event /// /// Return none if sending the event would block. @@ -18,7 +18,7 @@ pub trait Sender { } /// A sender that can block -pub trait BlockingSender: Sender { +pub trait Sender: NonBlockingSender { /// Send an event /// /// This blocks if the event cannot be sent immediately. @@ -44,13 +44,13 @@ pub enum ImmediateEvent { Lagged(u64), } -impl Sender for DynamicSender<'_, E> { +impl NonBlockingSender for DynamicSender<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { DynamicSender::try_send(self, event).ok() } } -impl BlockingSender for DynamicSender<'_, E> { +impl Sender for DynamicSender<'_, E> { fn send(&mut self, event: E) -> impl Future { DynamicSender::send(self, event) } @@ -66,20 +66,20 @@ impl Receiver for DynamicReceiver<'_, E> { } } -impl Sender for DynImmediatePublisher<'_, E> { +impl NonBlockingSender for DynImmediatePublisher<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { self.publish_immediate(event); Some(()) } } -impl Sender for DynPublisher<'_, E> { +impl NonBlockingSender for DynPublisher<'_, E> { fn try_send(&mut self, event: E) -> Option<()> { self.try_publish(event).ok() } } -impl BlockingSender for DynPublisher<'_, E> { +impl Sender for DynPublisher<'_, E> { fn send(&mut self, event: E) -> impl Future { self.publish(event) } @@ -127,13 +127,13 @@ impl Receiver> for DynSubscriber<'_, E> { } } -impl Sender for ChannelSender<'_, M, E, N> { +impl NonBlockingSender for ChannelSender<'_, M, E, N> { fn try_send(&mut self, event: E) -> Option<()> { ChannelSender::try_send(self, event).ok() } } -impl BlockingSender for ChannelSender<'_, M, E, N> { +impl Sender for ChannelSender<'_, M, E, N> { fn send(&mut self, event: E) -> impl Future { ChannelSender::send(self, event) } @@ -152,24 +152,24 @@ impl Receiver for ChannelReceiver<'_, M, E, N /// A sender that discards all events sent to it. pub struct NoopSender; -impl Sender for NoopSender { +impl NonBlockingSender for NoopSender { fn try_send(&mut self, _event: E) -> Option<()> { Some(()) } } -impl BlockingSender for NoopSender { +impl Sender for NoopSender { async fn send(&mut self, _event: E) {} } /// Applies a function on events before passing them to the wrapped sender -pub struct MapSender, F: FnMut(I) -> O> { +pub struct MapSender, F: FnMut(I) -> O> { sender: S, map_fn: F, _phantom: PhantomData<(I, O)>, } -impl, F: FnMut(I) -> O> MapSender { +impl, F: FnMut(I) -> O> MapSender { /// Create a new self pub fn new(sender: S, map_fn: F) -> Self { Self { @@ -180,13 +180,13 @@ impl, F: FnMut(I) -> O> MapSender { } } -impl, F: FnMut(I) -> O> Sender for MapSender { +impl, F: FnMut(I) -> O> NonBlockingSender for MapSender { fn try_send(&mut self, event: I) -> Option<()> { self.sender.try_send((self.map_fn)(event)) } } -impl, F: FnMut(I) -> O> BlockingSender for MapSender { +impl, F: FnMut(I) -> O> Sender for MapSender { fn send(&mut self, event: I) -> impl Future { self.sender.send((self.map_fn)(event)) } diff --git a/power-policy-service/src/service/mod.rs b/power-policy-service/src/service/mod.rs index e21bd0951..28f724220 100644 --- a/power-policy-service/src/service/mod.rs +++ b/power-policy-service/src/service/mod.rs @@ -9,7 +9,7 @@ pub mod task; use embedded_services::error; use embedded_services::named::Named; -use embedded_services::{event::Sender, info, sync::Lockable, trace}; +use embedded_services::{event::NonBlockingSender, info, sync::Lockable, trace}; use power_policy_interface::charger::{Charger, PsuState}; use power_policy_interface::{ diff --git a/power-policy-service/src/service/registration.rs b/power-policy-service/src/service/registration.rs index 722c832c7..d6301f413 100644 --- a/power-policy-service/src/service/registration.rs +++ b/power-policy-service/src/service/registration.rs @@ -1,12 +1,12 @@ //! Code related to registration with the power policy service. -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use power_policy_interface::{charger, psu, service::event::Event as ServiceEvent}; /// Registration trait that abstracts over various registration details. pub trait Registration<'device> { type Psu: Lockable + 'device; - type ServiceSender: Sender>; + type ServiceSender: NonBlockingSender>; type Charger: Lockable + 'device; /// Returns a slice to access PSU devices @@ -22,7 +22,7 @@ pub struct ArrayRegistration< 'device, Psu: Lockable + 'device, const PSU_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, Charger: Lockable + 'device, const CHARGER_COUNT: usize, @@ -39,7 +39,7 @@ impl< 'device, Psu: Lockable + 'device, const PSU_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, Charger: Lockable + 'device, const CHARGER_COUNT: usize, diff --git a/power-policy-service/tests/common/mock.rs b/power-policy-service/tests/common/mock.rs index 627a7ff02..04191064c 100644 --- a/power-policy-service/tests/common/mock.rs +++ b/power-policy-service/tests/common/mock.rs @@ -2,7 +2,7 @@ #![allow(dead_code)] use embassy_sync::{channel, mutex::Mutex, signal::Signal}; use embedded_batteries_async::charger::{MilliAmps, MilliVolts}; -use embedded_services::{GlobalRawMutex, event::Sender, info, named::Named}; +use embedded_services::{GlobalRawMutex, event::NonBlockingSender, info, named::Named}; use power_policy_interface::{ capability::{ConsumerPowerCapability, PowerCapability, ProviderFlags, ProviderPowerCapability}, charger, @@ -18,7 +18,7 @@ pub enum FnCall { Reset, } -pub struct Mock<'a, S: Sender> { +pub struct Mock<'a, S: NonBlockingSender> { sender: S, fn_call: &'a Signal, // Internal state @@ -26,7 +26,7 @@ pub struct Mock<'a, S: Sender> { name: &'static str, } -impl<'a, S: Sender> Mock<'a, S> { +impl<'a, S: NonBlockingSender> Mock<'a, S> { pub fn new(name: &'static str, sender: S, fn_call: &'a Signal) -> Self { Self { name, @@ -93,7 +93,7 @@ impl<'a, S: Sender> Mock<'a, S> { } } -impl<'a, S: Sender> Psu for Mock<'a, S> { +impl<'a, S: NonBlockingSender> Psu for Mock<'a, S> { async fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> Result<(), Error> { info!("Connect consumer {:#?}", capability); self.record_fn_call(FnCall::ConnectConsumer(capability)); @@ -121,7 +121,7 @@ impl<'a, S: Sender> Psu for Mock<'a, S> { } } -impl<'a, S: Sender> Named for Mock<'a, S> { +impl<'a, S: NonBlockingSender> Named for Mock<'a, S> { fn name(&self) -> &'static str { self.name } diff --git a/thermal-service/src/fan.rs b/thermal-service/src/fan.rs index a4a04f9be..d51e3f400 100644 --- a/thermal-service/src/fan.rs +++ b/thermal-service/src/fan.rs @@ -5,7 +5,7 @@ use embassy_sync::signal::Signal; use embassy_time::{Duration, Timer}; use embedded_fans_async::Error as _; use embedded_sensors_hal_async::temperature::DegreesCelsius; -use embedded_services::event::Sender; +use embedded_services::event::NonBlockingSender; use embedded_services::{GlobalRawMutex, error, trace}; use thermal_service_interface::{fan, sensor}; @@ -102,14 +102,20 @@ impl ServiceInner, const SAMPLE_BUF_LEN: usize> { +pub struct Service< + 'hw, + T: fan::Driver, + S: sensor::SensorService, + E: NonBlockingSender, + const SAMPLE_BUF_LEN: usize, +> { inner: &'hw ServiceInner, _phantom: PhantomData<(S, E)>, } // Note: We can't derive these traits because the compiler thinks our generics then need to be Copy + Clone, // but we only hold a reference and don't actually need to be that strict -impl, const SAMPLE_BUF_LEN: usize> Clone +impl, const SAMPLE_BUF_LEN: usize> Clone for Service<'_, T, S, E, SAMPLE_BUF_LEN> { fn clone(&self) -> Self { @@ -117,13 +123,13 @@ impl, const SAMP } } -impl, const SAMPLE_BUF_LEN: usize> Copy +impl, const SAMPLE_BUF_LEN: usize> Copy for Service<'_, T, S, E, SAMPLE_BUF_LEN> { } -impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const SAMPLE_BUF_LEN: usize> fan::FanService - for Service<'hw, T, S, E, SAMPLE_BUF_LEN> +impl<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> + fan::FanService for Service<'hw, T, S, E, SAMPLE_BUF_LEN> { async fn enable_auto_control(&self) -> Result<(), fan::Error> { self.inner.change_state(fan::State::Off).await?; @@ -222,7 +228,7 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const } /// Parameters required to initialize a fan service. -pub struct InitParams<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender> { +pub struct InitParams<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender> { /// The underlying fan driver this service will control. pub driver: T, /// Initial configuration for the fan service. @@ -247,13 +253,19 @@ impl Default for Resources, const SAMPLE_BUF_LEN: usize> { +pub struct Runner< + 'hw, + T: fan::Driver, + S: sensor::SensorService, + E: NonBlockingSender, + const SAMPLE_BUF_LEN: usize, +> { service: &'hw ServiceInner, sensor: S, event_senders: &'hw mut [E], } -impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: fan::Driver, S: sensor::SensorService, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, S, E, SAMPLE_BUF_LEN> { fn broadcast_event(&mut self, event: fan::Event) { @@ -366,8 +378,13 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService, E: Sender, const } } -impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> - odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, S, E, SAMPLE_BUF_LEN> +impl< + 'hw, + T: fan::Driver, + S: sensor::SensorService + 'hw, + E: NonBlockingSender + 'hw, + const SAMPLE_BUF_LEN: usize, +> odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, S, E, SAMPLE_BUF_LEN> { async fn run(mut self) -> embedded_services::Never { let service = self.service; @@ -377,8 +394,13 @@ impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender } } -impl<'hw, T: fan::Driver, S: sensor::SensorService + 'hw, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> - odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, S, E, SAMPLE_BUF_LEN> +impl< + 'hw, + T: fan::Driver, + S: sensor::SensorService + 'hw, + E: NonBlockingSender + 'hw, + const SAMPLE_BUF_LEN: usize, +> odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, S, E, SAMPLE_BUF_LEN> { type Runner = Runner<'hw, T, S, E, SAMPLE_BUF_LEN>; type Resources = Resources; diff --git a/thermal-service/src/sensor.rs b/thermal-service/src/sensor.rs index 3307fed06..79660905f 100644 --- a/thermal-service/src/sensor.rs +++ b/thermal-service/src/sensor.rs @@ -3,7 +3,7 @@ use core::marker::PhantomData; use embassy_sync::{mutex::Mutex, signal::Signal}; use embassy_time::{Duration, Timer, with_timeout}; use embedded_sensors_hal_async::temperature::DegreesCelsius; -use embedded_services::event::Sender; +use embedded_services::event::NonBlockingSender; use embedded_services::{GlobalRawMutex, error}; use thermal_service_interface::sensor; @@ -102,14 +102,14 @@ impl ServiceInner, const SAMPLE_BUF_LEN: usize> { +pub struct Service<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> { inner: &'hw ServiceInner, _phantom: PhantomData, } // Note: We can't derive these traits because the compiler thinks our generics then need to be Copy + Clone, // but we only hold a reference and don't actually need to be that strict -impl, const SAMPLE_BUF_LEN: usize> Clone +impl, const SAMPLE_BUF_LEN: usize> Clone for Service<'_, T, E, SAMPLE_BUF_LEN> { fn clone(&self) -> Self { @@ -117,12 +117,12 @@ impl, const SAMPLE_BUF_LEN: usize> C } } -impl, const SAMPLE_BUF_LEN: usize> Copy +impl, const SAMPLE_BUF_LEN: usize> Copy for Service<'_, T, E, SAMPLE_BUF_LEN> { } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> sensor::SensorService +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> sensor::SensorService for Service<'hw, T, E, SAMPLE_BUF_LEN> { async fn temperature(&self) -> DegreesCelsius { @@ -172,7 +172,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi } /// Parameters required to initialize a sensor service. -pub struct InitParams<'hw, T: sensor::Driver, E: Sender> { +pub struct InitParams<'hw, T: sensor::Driver, E: NonBlockingSender> { /// The underlying sensor driver this service will control. pub driver: T, /// Initial configuration for the sensor service. @@ -205,13 +205,15 @@ struct State { } /// A task runner for a sensor. Users must run this in an embassy task or similar async execution context. -pub struct Runner<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> { +pub struct Runner<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> { service: &'hw ServiceInner, event_senders: &'hw mut [E], state: State, } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> Runner<'hw, T, E, SAMPLE_BUF_LEN> { +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> + Runner<'hw, T, E, SAMPLE_BUF_LEN> +{ fn broadcast_event(&mut self, event: sensor::Event) { for sender in self.event_senders.iter_mut() { if sender.try_send(event).is_none() { @@ -257,7 +259,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi } } -impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: sensor::Driver, E: NonBlockingSender, const SAMPLE_BUF_LEN: usize> odp_service_common::runnable_service::ServiceRunner<'hw> for Runner<'hw, T, E, SAMPLE_BUF_LEN> { async fn run(mut self) -> embedded_services::Never { @@ -303,7 +305,7 @@ impl<'hw, T: sensor::Driver, E: Sender, const SAMPLE_BUF_LEN: usi } } -impl<'hw, T: sensor::Driver, E: Sender + 'hw, const SAMPLE_BUF_LEN: usize> +impl<'hw, T: sensor::Driver, E: NonBlockingSender + 'hw, const SAMPLE_BUF_LEN: usize> odp_service_common::runnable_service::Service<'hw> for Service<'hw, T, E, SAMPLE_BUF_LEN> { type Runner = Runner<'hw, T, E, SAMPLE_BUF_LEN>; diff --git a/type-c-service/src/controller/electrical_disconnect.rs b/type-c-service/src/controller/electrical_disconnect.rs index e4f92ffd4..a6a4fe0f1 100644 --- a/type-c-service/src/controller/electrical_disconnect.rs +++ b/type-c-service/src/controller/electrical_disconnect.rs @@ -1,7 +1,7 @@ //! Electrical disconnect port trait implementation use core::num::NonZeroU8; -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::controller::electrical_disconnect::ElectricalDisconnect; @@ -12,9 +12,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::electrical_disconnect::ElectricalDisconnect for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/event_receiver.rs b/type-c-service/src/controller/event_receiver.rs index 69825b581..53983c78d 100644 --- a/type-c-service/src/controller/event_receiver.rs +++ b/type-c-service/src/controller/event_receiver.rs @@ -4,7 +4,7 @@ use core::future::pending; use embassy_futures::select::{Either, select}; use embassy_time::Timer; use embedded_services::error; -use embedded_services::event::{Receiver, Sender}; +use embedded_services::event::{NonBlockingSender, Receiver}; use embedded_services::sync::Lockable; use crate::PortEventStreamer; @@ -19,12 +19,12 @@ pub trait InterruptReceiver { } /// Struct to send received interrupts to their corresponding port receivers -pub struct PortEventSplitter> { +pub struct PortEventSplitter> { /// Senders to forward port events to their corresponding port receivers sender: [S; N], } -impl> PortEventSplitter { +impl> PortEventSplitter { /// Create a new instance pub fn new(sender: [S; N]) -> Self { Self { sender } diff --git a/type-c-service/src/controller/macros.rs b/type-c-service/src/controller/macros.rs index 0fe30d216..90c4383e1 100644 --- a/type-c-service/src/controller/macros.rs +++ b/type-c-service/src/controller/macros.rs @@ -1,5 +1,5 @@ use embedded_services::{ - event::{Receiver, Sender}, + event::{NonBlockingSender, Receiver}, sync::Lockable, }; use type_c_interface::port::event::PortEventBitfield; @@ -20,7 +20,7 @@ pub struct PortComponents< PowerPolicyReceveiver: Receiver, LoopbackReceiver: Receiver, InterruptReceiver: Receiver, - InterruptSender: Sender, + InterruptSender: NonBlockingSender, > { /// Port instance pub port: &'a Port, diff --git a/type-c-service/src/controller/max_sink_voltage.rs b/type-c-service/src/controller/max_sink_voltage.rs index 37609c348..d4e259231 100644 --- a/type-c-service/src/controller/max_sink_voltage.rs +++ b/type-c-service/src/controller/max_sink_voltage.rs @@ -1,5 +1,5 @@ //! Max sink voltage port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::controller::max_sink_voltage::MaxSinkVoltage; @@ -10,9 +10,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::max_sink_voltage::MaxSinkVoltage for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/mod.rs b/type-c-service/src/controller/mod.rs index d8be4d342..c458eb3c7 100644 --- a/type-c-service/src/controller/mod.rs +++ b/type-c-service/src/controller/mod.rs @@ -1,5 +1,5 @@ //! Struct that manages per-port state, interfacing with a controller object that exposes multiple ports. -use embedded_services::{debug, error, event::Sender, info, named::Named, sync::Lockable}; +use embedded_services::{debug, error, event::NonBlockingSender, info, named::Named, sync::Lockable}; use embedded_usb_pd::{LocalPortId, PdError}; use power_policy_interface::psu::PsuState; use type_c_interface::control::pd::PortStatus; @@ -28,9 +28,9 @@ pub struct Port< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > { /// Local port port: LocalPortId, @@ -58,9 +58,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Create new Port instance @@ -228,9 +228,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Named for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { fn name(&self) -> &'static str { diff --git a/type-c-service/src/controller/pd.rs b/type-c-service/src/controller/pd.rs index ea84354ef..a1532cbc6 100644 --- a/type-c-service/src/controller/pd.rs +++ b/type-c-service/src/controller/pd.rs @@ -1,5 +1,5 @@ //! PD functionality unrelated to power contracts and general port status -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use embedded_usb_pd::ado::Ado; use embedded_usb_pd::vdm::structured::command::discover_identity::{sop, sop_prime}; @@ -22,9 +22,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Process a VDM event by retrieving the relevant VDM data from the `controller` for the appropriate `port`. @@ -85,9 +85,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::pd::Pd for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn get_port_status(&mut self) -> Result { @@ -175,9 +175,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::pd::StateMachine for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn set_pd_state_machine_config(&mut self, config: PdStateMachineConfig) -> Result<(), PdError> { diff --git a/type-c-service/src/controller/power.rs b/type-c-service/src/controller/power.rs index 0d7a43e46..84ceb8add 100644 --- a/type-c-service/src/controller/power.rs +++ b/type-c-service/src/controller/power.rs @@ -1,6 +1,6 @@ //! Module for power policy related functionality use embassy_time::{Duration, Instant}; -use embedded_services::{debug, error, event::Sender, info, sync::Lockable}; +use embedded_services::{debug, error, event::NonBlockingSender, info, sync::Lockable}; use embedded_usb_pd::{ PdError, constants::{T_PS_TRANSITION_EPR_MS, T_PS_TRANSITION_SPR_MS}, @@ -19,9 +19,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { /// Handle a new contract as consumer @@ -123,9 +123,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > Psu for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn disconnect(&mut self) -> Result<(), PsuError> { @@ -179,9 +179,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::power::SystemPowerStateStatus for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { diff --git a/type-c-service/src/controller/retimer.rs b/type-c-service/src/controller/retimer.rs index e55fc4fa7..dc5ed4ed7 100644 --- a/type-c-service/src/controller/retimer.rs +++ b/type-c-service/src/controller/retimer.rs @@ -1,5 +1,5 @@ //! Retimer port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::control::retimer::RetimerFwUpdateState; use type_c_interface::controller::retimer::Retimer; @@ -11,9 +11,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::retimer::Retimer for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn get_rt_fw_update_status(&mut self) -> Result { diff --git a/type-c-service/src/controller/type_c.rs b/type-c-service/src/controller/type_c.rs index 54e75bdc3..ea1e2e75f 100644 --- a/type-c-service/src/controller/type_c.rs +++ b/type-c-service/src/controller/type_c.rs @@ -1,5 +1,5 @@ //! Type-C state machine port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::PdError; use type_c_interface::control::type_c::TypeCStateMachineState; use type_c_interface::controller::type_c::StateMachine; @@ -11,9 +11,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::port::type_c::StateMachine for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn set_type_c_state_machine_config(&mut self, state: TypeCStateMachineState) -> Result<(), PdError> { diff --git a/type-c-service/src/controller/ucsi.rs b/type-c-service/src/controller/ucsi.rs index 3dfdbf6dd..ec786db4c 100644 --- a/type-c-service/src/controller/ucsi.rs +++ b/type-c-service/src/controller/ucsi.rs @@ -1,5 +1,5 @@ //! UCSI LPM port trait implementation -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::{PdError, ucsi::lpm}; use type_c_interface::ucsi::Lpm as UcsiLpm; @@ -10,9 +10,9 @@ impl< 'device, C: Lockable, Shared: Lockable, - TypeCSender: Sender, - PowerSender: Sender, - LoopbackSender: Sender, + TypeCSender: NonBlockingSender, + PowerSender: NonBlockingSender, + LoopbackSender: NonBlockingSender, > type_c_interface::ucsi::Lpm for Port<'device, C, Shared, TypeCSender, PowerSender, LoopbackSender> { async fn execute_lpm_command(&mut self, command: lpm::LocalCommand) -> Result, PdError> { diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index 566a3f22a..75db4b484 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -1,7 +1,7 @@ use core::marker::PhantomData; use core::ptr; -use embedded_services::event::Sender as _; +use embedded_services::event::NonBlockingSender as _; use embedded_services::named::Named as _; use embedded_services::sync::Lockable; use embedded_services::{debug, error, info, trace}; diff --git a/type-c-service/src/service/registration.rs b/type-c-service/src/service/registration.rs index 815b151cb..654736f55 100644 --- a/type-c-service/src/service/registration.rs +++ b/type-c-service/src/service/registration.rs @@ -1,6 +1,6 @@ //! Code related to registration with the type-C service -use embedded_services::{event::Sender, sync::Lockable}; +use embedded_services::{event::NonBlockingSender, sync::Lockable}; use embedded_usb_pd::{GlobalPortId, LocalPortId}; use type_c_interface::port::pd::Pd; use type_c_interface::service::event::Event as ServiceEvent; @@ -9,7 +9,7 @@ use type_c_interface::ucsi::Lpm as UcsiLpm; /// Registration trait that abstracts over various registration details. pub trait Registration<'port> { type Port: Lockable + 'port; - type ServiceSender: Sender>; + type ServiceSender: NonBlockingSender>; /// Returns a slice to access ports fn ports(&self) -> &[&'port Self::Port]; @@ -29,7 +29,7 @@ pub struct ArrayRegistration< 'port, Port: Lockable + 'port, const PORT_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, > { /// Array of registered ports @@ -44,7 +44,7 @@ impl< 'port, Port: Lockable + 'port, const PORT_COUNT: usize, - ServiceSender: Sender>, + ServiceSender: NonBlockingSender>, const SERVICE_SENDER_COUNT: usize, > Registration<'port> for ArrayRegistration<'port, Port, PORT_COUNT, ServiceSender, SERVICE_SENDER_COUNT> { diff --git a/type-c-service/tests/common/mod.rs b/type-c-service/tests/common/mod.rs index 5dbcfe27b..c5d8beef8 100644 --- a/type-c-service/tests/common/mod.rs +++ b/type-c-service/tests/common/mod.rs @@ -1,7 +1,7 @@ use std::mem::ManuallyDrop; use embassy_futures::{ - join::{join, join3}, + join::join3, select::{Either, select}, }; use embassy_sync::{ @@ -11,7 +11,7 @@ use embassy_sync::{ watch, }; use embassy_time::{Duration, with_timeout}; -use embedded_services::{GlobalRawMutex, event::Sender}; +use embedded_services::{GlobalRawMutex, event::NonBlockingSender}; use embedded_usb_pd::LocalPortId; use paste::paste; use power_policy_interface::charger::mock::NoopCharger; @@ -198,7 +198,7 @@ pub struct PowerPolicyServiceEventRouter<'port, 'ch> { type_c_sender: DynamicSender<'ch, power_policy_interface::service::event::EventData>, } -impl<'port, 'ch> Sender>> +impl<'port, 'ch> NonBlockingSender>> for PowerPolicyServiceEventRouter<'port, 'ch> { fn try_send( @@ -208,10 +208,6 @@ impl<'port, 'ch> Sender>) { - join(self.test_sender.send(event), self.type_c_sender.send(event.into())).await; - } } /// Power policy event loop task