Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 61 additions & 13 deletions embedded-service/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
//! Common traits for event senders and receivers
use core::{future::ready, marker::PhantomData};
use core::marker::PhantomData;

use crate::error;

use embassy_sync::{
blocking_mutex::raw::RawMutex,
channel::{DynamicReceiver, DynamicSender, Receiver as ChannelReceiver, Sender as ChannelSender},
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
pubsub::{DynImmediatePublisher, DynPublisher, DynSubscriber, WaitResult},
};

/// Common event sender trait
pub trait Sender<E> {
pub trait NonBlockingSender<E> {
/// Attempt to send an event
///
/// Return none if the event cannot currently be sent
/// Return none if sending the event would block.
fn try_send(&mut self, event: E) -> Option<()>;
}

/// A sender that can block
pub trait Sender<E>: NonBlockingSender<E> {
/// Send an event
///
/// This blocks if the event cannot be sent immediately.
fn send(&mut self, event: E) -> impl Future<Output = ()>;
}

Expand All @@ -29,11 +35,22 @@ pub trait Receiver<E> {
fn wait_next(&mut self) -> impl Future<Output = E>;
}

impl<E> Sender<E> for DynamicSender<'_, E> {
/// Enum for receivers that can receive immediate events
#[derive(Clone)]
pub enum ImmediateEvent<E> {
/// Event
Event(E),
/// Lagged events
Lagged(u64),
}

impl<E> NonBlockingSender<E> for DynamicSender<'_, E> {
fn try_send(&mut self, event: E) -> Option<()> {
DynamicSender::try_send(self, event).ok()
}
}

impl<E> Sender<E> for DynamicSender<'_, E> {
fn send(&mut self, event: E) -> impl Future<Output = ()> {
DynamicSender::send(self, event)
}
Expand All @@ -49,14 +66,22 @@ impl<E> Receiver<E> for DynamicReceiver<'_, E> {
}
}

impl<E: Clone> Sender<E> for DynImmediatePublisher<'_, E> {
impl<E: Clone> NonBlockingSender<E> for DynImmediatePublisher<'_, E> {
fn try_send(&mut self, event: E) -> Option<()> {
self.publish_immediate(event);
Some(())
}
}

Comment thread
RobertZ2011 marked this conversation as resolved.
impl<E: Clone> NonBlockingSender<E> for DynPublisher<'_, E> {
fn try_send(&mut self, event: E) -> Option<()> {
self.try_publish(event).ok()
}
}

impl<E: Clone> Sender<E> for DynPublisher<'_, E> {
fn send(&mut self, event: E) -> impl Future<Output = ()> {
self.publish_immediate(event);
ready(())
self.publish(event)
}
}

Expand Down Expand Up @@ -85,11 +110,30 @@ impl<E: Clone> Receiver<E> for DynSubscriber<'_, E> {
}
}

impl<M: RawMutex, E, const N: usize> Sender<E> for ChannelSender<'_, M, E, N> {
impl<E: Clone> Receiver<ImmediateEvent<E>> for DynSubscriber<'_, E> {
fn try_next(&mut self) -> Option<ImmediateEvent<E>> {
match self.try_next_message() {
Some(WaitResult::Message(e)) => Some(ImmediateEvent::Event(e)),
Some(WaitResult::Lagged(e)) => Some(ImmediateEvent::Lagged(e)),
_ => None,
}
}

async fn wait_next(&mut self) -> ImmediateEvent<E> {
match self.next_message().await {
WaitResult::Message(e) => ImmediateEvent::Event(e),
WaitResult::Lagged(e) => ImmediateEvent::Lagged(e),
}
}
}

impl<M: RawMutex, E, const N: usize> NonBlockingSender<E> for ChannelSender<'_, M, E, N> {
fn try_send(&mut self, event: E) -> Option<()> {
ChannelSender::try_send(self, event).ok()
}
}

impl<M: RawMutex, E, const N: usize> Sender<E> for ChannelSender<'_, M, E, N> {
fn send(&mut self, event: E) -> impl Future<Output = ()> {
ChannelSender::send(self, event)
}
Expand All @@ -108,22 +152,24 @@ impl<M: RawMutex, E, const N: usize> Receiver<E> for ChannelReceiver<'_, M, E, N
/// A sender that discards all events sent to it.
pub struct NoopSender;

impl<E> Sender<E> for NoopSender {
impl<E> NonBlockingSender<E> for NoopSender {
fn try_send(&mut self, _event: E) -> Option<()> {
Some(())
}
}

impl<E> Sender<E> for NoopSender {
async fn send(&mut self, _event: E) {}
}

/// Applies a function on events before passing them to the wrapped sender
pub struct MapSender<I, O, S: Sender<O>, F: FnMut(I) -> O> {
pub struct MapSender<I, O, S: NonBlockingSender<O>, F: FnMut(I) -> O> {
sender: S,
map_fn: F,
_phantom: PhantomData<(I, O)>,
}

impl<I, O, S: Sender<O>, F: FnMut(I) -> O> MapSender<I, O, S, F> {
impl<I, O, S: NonBlockingSender<O>, F: FnMut(I) -> O> MapSender<I, O, S, F> {
/// Create a new self
pub fn new(sender: S, map_fn: F) -> Self {
Self {
Expand All @@ -134,11 +180,13 @@ impl<I, O, S: Sender<O>, F: FnMut(I) -> O> MapSender<I, O, S, F> {
}
}

impl<I, O, S: Sender<O>, F: FnMut(I) -> O> Sender<I> for MapSender<I, O, S, F> {
impl<I, O, S: NonBlockingSender<O>, F: FnMut(I) -> O> NonBlockingSender<I> for MapSender<I, O, S, F> {
fn try_send(&mut self, event: I) -> Option<()> {
self.sender.try_send((self.map_fn)(event))
}
}

impl<I, O, S: Sender<O>, F: FnMut(I) -> O> Sender<I> for MapSender<I, O, S, F> {
fn send(&mut self, event: I) -> impl Future<Output = ()> {
self.sender.send((self.map_fn)(event))
}
Expand Down
12 changes: 4 additions & 8 deletions power-policy-service/src/service/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
if unconstrained_new != self.state.unconstrained {
info!("Unconstrained state changed: {:?}", unconstrained_new);
self.state.unconstrained = unconstrained_new;
self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained))
.await;
self.broadcast_event(ServiceEvent::Unconstrained(self.state.unconstrained));
}
Ok(())
}
Expand Down Expand Up @@ -157,8 +156,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
self.broadcast_event(ServiceEvent::ConsumerConnected(
connected_consumer.psu,
connected_consumer.consumer_power_capability,
))
.await;
));

Ok(())
}
Expand Down Expand Up @@ -205,8 +203,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
// so just continue execution.
self.disconnect_chargers().await?;

self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu))
.await;
self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu));

// Don't update the unconstrained here because this is a transitional state
}
Expand Down Expand Up @@ -249,8 +246,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
// Notify disconnect if recently detached consumer was previously attached.
if let Some(current_consumer) = self.state.current_consumer_state {
self.disconnect_chargers().await?;
self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu))
.await;
self.broadcast_event(ServiceEvent::ConsumerDisconnected(current_consumer.psu));
}
// No new consumer available
self.state.current_consumer_state = None;
Expand Down
9 changes: 6 additions & 3 deletions power-policy-service/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ pub mod provider;
pub mod registration;
pub mod task;

use embedded_services::error;
use embedded_services::named::Named;
use embedded_services::{event::Sender, info, sync::Lockable, trace};
use embedded_services::{event::NonBlockingSender, info, sync::Lockable, trace};

use power_policy_interface::charger::{Charger, PsuState};
use power_policy_interface::{
Expand Down Expand Up @@ -137,9 +138,11 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
}

/// Send an event to all registered listeners
async fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) {
fn broadcast_event(&mut self, event: ServiceEvent<'device, Reg::Psu>) {
for sender in self.registration.event_senders() {
sender.send(event).await;
if sender.try_send(event).is_none() {
error!("Failed to send event to listener");
Comment thread
RobertZ2011 marked this conversation as resolved.
}
}
}

Expand Down
9 changes: 4 additions & 5 deletions power-policy-service/src/service/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
e
} else {
locked_requester.connect_provider(target_power).await?;
self.post_provider_connected(requester, target_power).await;
self.post_provider_connected(requester, target_power);
Ok(())
}
}

/// Common logic for after a provider has successfully connected
async fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) {
fn post_provider_connected(&mut self, requester: &'device Reg::Psu, target_power: ProviderPowerCapability) {
if self
.state
.connected_providers
Expand All @@ -111,8 +111,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
{
error!("Tracked providers set is full");
}
self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power))
.await;
self.broadcast_event(ServiceEvent::ProviderConnected(requester, target_power));
}

/// Common logic for when a provider is removed
Expand All @@ -137,7 +136,7 @@ impl<'device, Reg: Registration<'device>> Service<'device, Reg> {
self.state.current_provider_state.state = PowerState::Unlimited;
}

self.broadcast_event(ServiceEvent::ProviderDisconnected(psu)).await;
self.broadcast_event(ServiceEvent::ProviderDisconnected(psu));
true
} else {
false
Expand Down
8 changes: 4 additions & 4 deletions power-policy-service/src/service/registration.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Code related to registration with the power policy service.

use embedded_services::{event::Sender, sync::Lockable};
use embedded_services::{event::NonBlockingSender, sync::Lockable};
use power_policy_interface::{charger, psu, service::event::Event as ServiceEvent};

/// Registration trait that abstracts over various registration details.
pub trait Registration<'device> {
type Psu: Lockable<Inner: psu::Psu> + 'device;
type ServiceSender: Sender<ServiceEvent<'device, Self::Psu>>;
type ServiceSender: NonBlockingSender<ServiceEvent<'device, Self::Psu>>;
type Charger: Lockable<Inner: charger::Charger> + 'device;

/// Returns a slice to access PSU devices
Expand All @@ -22,7 +22,7 @@ pub struct ArrayRegistration<
'device,
Psu: Lockable<Inner: psu::Psu> + 'device,
const PSU_COUNT: usize,
ServiceSender: Sender<ServiceEvent<'device, Psu>>,
ServiceSender: NonBlockingSender<ServiceEvent<'device, Psu>>,
const SERVICE_SENDER_COUNT: usize,
Charger: Lockable<Inner: charger::Charger> + 'device,
const CHARGER_COUNT: usize,
Expand All @@ -39,7 +39,7 @@ impl<
'device,
Psu: Lockable<Inner: psu::Psu> + 'device,
const PSU_COUNT: usize,
ServiceSender: Sender<ServiceEvent<'device, Psu>>,
ServiceSender: NonBlockingSender<ServiceEvent<'device, Psu>>,
const SERVICE_SENDER_COUNT: usize,
Charger: Lockable<Inner: charger::Charger> + 'device,
const CHARGER_COUNT: usize,
Expand Down
34 changes: 18 additions & 16 deletions power-policy-service/tests/common/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(dead_code)]
use embassy_sync::{channel, mutex::Mutex, signal::Signal};
use embedded_batteries_async::charger::{MilliAmps, MilliVolts};
use embedded_services::{GlobalRawMutex, event::Sender, info, named::Named};
use embedded_services::{GlobalRawMutex, event::NonBlockingSender, info, named::Named};
use power_policy_interface::{
capability::{ConsumerPowerCapability, PowerCapability, ProviderFlags, ProviderPowerCapability},
charger,
Expand All @@ -18,15 +18,15 @@ pub enum FnCall {
Reset,
}

pub struct Mock<'a, S: Sender<EventData>> {
pub struct Mock<'a, S: NonBlockingSender<EventData>> {
sender: S,
fn_call: &'a Signal<GlobalRawMutex, (usize, FnCall)>,
// Internal state
pub state: State,
name: &'static str,
}

impl<'a, S: Sender<EventData>> Mock<'a, S> {
impl<'a, S: NonBlockingSender<EventData>> Mock<'a, S> {
pub fn new(name: &'static str, sender: S, fn_call: &'a Signal<GlobalRawMutex, (usize, FnCall)>) -> Self {
Self {
name,
Expand All @@ -47,21 +47,21 @@ impl<'a, S: Sender<EventData>> Mock<'a, S> {

pub async fn simulate_consumer_connection(&mut self, capability: ConsumerPowerCapability) {
self.state.attach().unwrap();
self.sender.send(EventData::Attached).await;
self.sender.try_send(EventData::Attached).unwrap();
self.state.update_consumer_power_capability(Some(capability)).unwrap();
self.sender
.send(EventData::UpdatedConsumerCapability(Some(capability)))
.await;
.try_send(EventData::UpdatedConsumerCapability(Some(capability)))
.unwrap();
}

pub async fn simulate_detach(&mut self) {
self.state.detach();
self.sender.send(EventData::Detached).await;
self.sender.try_send(EventData::Detached).unwrap();
}

pub async fn simulate_provider_connection(&mut self, capability: PowerCapability) {
self.state.attach().unwrap();
self.sender.send(EventData::Attached).await;
self.sender.try_send(EventData::Attached).unwrap();

let capability = Some(ProviderPowerCapability {
capability,
Expand All @@ -71,13 +71,13 @@ impl<'a, S: Sender<EventData>> Mock<'a, S> {
.update_requested_provider_power_capability(capability)
.unwrap();
self.sender
.send(EventData::RequestedProviderCapability(capability))
.await;
.try_send(EventData::RequestedProviderCapability(capability))
.unwrap();
}

pub async fn simulate_disconnect(&mut self) {
self.state.disconnect(true).unwrap();
self.sender.send(EventData::Disconnected).await;
self.sender.try_send(EventData::Disconnected).unwrap();
}

pub async fn simulate_update_requested_provider_power_capability(
Expand All @@ -88,12 +88,12 @@ impl<'a, S: Sender<EventData>> Mock<'a, S> {
.update_requested_provider_power_capability(capability)
.unwrap();
self.sender
.send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability))
.await
.try_send(power_policy_interface::psu::event::EventData::RequestedProviderCapability(capability))
.unwrap();
}
}

impl<'a, S: Sender<EventData>> Psu for Mock<'a, S> {
impl<'a, S: NonBlockingSender<EventData>> Psu for Mock<'a, S> {
async fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> Result<(), Error> {
info!("Connect consumer {:#?}", capability);
self.record_fn_call(FnCall::ConnectConsumer(capability));
Expand Down Expand Up @@ -121,7 +121,7 @@ impl<'a, S: Sender<EventData>> Psu for Mock<'a, S> {
}
}

impl<'a, S: Sender<EventData>> Named for Mock<'a, S> {
impl<'a, S: NonBlockingSender<EventData>> Named for Mock<'a, S> {
fn name(&self) -> &'static str {
self.name
}
Expand All @@ -146,7 +146,9 @@ impl<'a> ExampleCharger<'a> {
}

pub async fn simulate_psu_state_change(&self, psu_state: charger::PsuState) {
self.sender.send(charger::EventData::PsuStateChange(psu_state)).await;
self.sender
.try_send(charger::EventData::PsuStateChange(psu_state))
.unwrap();
}
}

Expand Down
Loading
Loading