diff --git a/conanfile.py b/conanfile.py index 4c6f6e8..bf3a320 100644 --- a/conanfile.py +++ b/conanfile.py @@ -42,7 +42,7 @@ def configure(self): self.options["*"].shared = False def requirements(self): - self.requires("viam-cpp-sdk/0.21.0") + self.requires("viam-cpp-sdk/0.37.0") self.requires("libmp3lame/3.100") self.requires("soxr/0.1.3") diff --git a/src/discovery.hpp b/src/discovery.hpp index df3c114..2328b4e 100644 --- a/src/discovery.hpp +++ b/src/discovery.hpp @@ -16,6 +16,9 @@ class AudioDiscovery : public viam::sdk::Discovery { audio::device_id::DeviceIdResolver* resolver = nullptr); std::vector discover_resources(const viam::sdk::ProtoStruct& extra) override; viam::sdk::ProtoStruct do_command(const viam::sdk::ProtoStruct& command) override; + viam::sdk::ProtoStruct get_status() override { + return {}; + } static viam::sdk::Model model; private: diff --git a/src/microphone.hpp b/src/microphone.hpp index c679485..470a7e1 100644 --- a/src/microphone.hpp +++ b/src/microphone.hpp @@ -50,6 +50,9 @@ class Microphone final : public viam::sdk::AudioIn { viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); + viam::sdk::ProtoStruct get_status() override { + return {}; + } // Restarts the stream. // Must NOT be called while holding stream_ctx_mu_. diff --git a/src/speaker.cpp b/src/speaker.cpp index 4f2df73..9021d3e 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -19,6 +19,12 @@ using audio::codec::AudioCodec; constexpr int MIN_VOLUME = 0; constexpr int MAX_VOLUME = 100; +// Distance from the buffer's lap point at which process_and_write_pcm blocks waiting for +// the callback to drain. This is what paces a faster-than-real-time producer (TTS, file +// playback) — and the size of the margin is the cushion against any delay in the callback +// advancing playback_position (scheduler jitter, USB stalls, etc.). +constexpr int BUFFER_MARGIN_MS = 50; + Speaker::Speaker(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, audio::portaudio::PortAudioInterface* pa) : viam::sdk::AudioOut(cfg.name()), pa_(pa), stream_(nullptr) { auto setup = audio::utils::setup_audio_device( @@ -296,15 +302,17 @@ void Speaker::play(std::vector const& audio_data, VIAM_SDK_LOG(debug) << "Play called, adding samples to playback buffer"; + if (audio_data.empty()) { + return; + } + if (!info) { VIAM_SDK_LOG(error) << "[Play]: Must specify audio info parameter"; throw std::invalid_argument("[Play]: Must specify audio info parameter"); } const std::string codec_str = info->codec; - - // Parse codec string to enum - const AudioCodec codec = audio::codec::parse_codec(codec_str); + AudioCodec codec = audio::codec::parse_codec(codec_str); // Detect and strip WAV header if present const uint8_t* raw_audio = audio_data.data(); @@ -320,133 +328,152 @@ void Speaker::play(std::vector const& audio_data, raw_audio_size -= audio::codec::wav_header_size; } - // decoded_buf holds converted data for non-PCM_16 codecs + // MP3 needs a stateful whole-buffer decode; the rest of the pipeline only sees PCM_16. + std::vector mp3_decoded; + if (codec == AudioCodec::MP3) { + MP3DecoderContext mp3_ctx; + decode_mp3_to_pcm16(mp3_ctx, raw_audio, raw_audio_size, mp3_decoded); + audio_sample_rate = mp3_ctx.sample_rate; + audio_num_channels = mp3_ctx.num_channels; + raw_audio = mp3_decoded.data(); + raw_audio_size = mp3_decoded.size(); + codec = AudioCodec::PCM_16; + } + + int speaker_sample_rate; + int speaker_num_channels; + std::shared_ptr playback_context; + { + std::lock_guard lock(stream_mu_); + if (!audio_context_) { + VIAM_SDK_LOG(error) << "[Play] Audio context is nullptr"; + throw std::runtime_error("Audio context is nullptr"); + } + playback_context = audio_context_; + speaker_sample_rate = stream_params_.sample_rate; + speaker_num_channels = stream_params_.num_channels; + } + + // Estimate post-resample sample count from input bytes — exact for PCM_16, ~2x conservative + // for PCM_32 variants — to bail before allocating if the audio won't fit the playback buffer. + { + const size_t pcm16_bytes_estimate = (codec == AudioCodec::PCM_16) ? raw_audio_size : raw_audio_size / 2; + const size_t input_samples_estimate = pcm16_bytes_estimate / sizeof(int16_t); + const double duration_seconds = static_cast(input_samples_estimate) / (audio_sample_rate * audio_num_channels); + if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { + throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + + " seconds); use PlayStream for longer audio"); + } + } + + const uint64_t start_position = playback_context->get_write_position(); + const size_t samples_written = process_and_write_pcm(raw_audio, + raw_audio_size, + codec, + audio_sample_rate, + audio_num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); + + wait_for_playback(playback_context, start_position, samples_written); +} + +size_t Speaker::process_and_write_pcm(const uint8_t* data, + size_t size, + AudioCodec codec, + int audio_sample_rate, + int audio_num_channels, + int speaker_sample_rate, + int speaker_num_channels, + std::shared_ptr playback_context) { + if (size == 0) { + throw std::invalid_argument("process_and_write_pcm: empty input"); + } + std::vector decoded_buf; const uint8_t* decoded_data = nullptr; size_t decoded_size = 0; - - // decode to pcm16 switch (codec) { - case AudioCodec::MP3: { - MP3DecoderContext mp3_ctx; - decode_mp3_to_pcm16(mp3_ctx, raw_audio, raw_audio_size, decoded_buf); - // For MP3, use the decoded properties from the file, not what user provided - audio_sample_rate = mp3_ctx.sample_rate; - audio_num_channels = mp3_ctx.num_channels; - break; - } case AudioCodec::PCM_32: - audio::codec::convert_pcm32_to_pcm16(raw_audio, raw_audio_size, decoded_buf); + audio::codec::convert_pcm32_to_pcm16(data, size, decoded_buf); + decoded_data = decoded_buf.data(); + decoded_size = decoded_buf.size(); break; case AudioCodec::PCM_32_FLOAT: - audio::codec::convert_float32_to_pcm16(raw_audio, raw_audio_size, decoded_buf); + audio::codec::convert_float32_to_pcm16(data, size, decoded_buf); + decoded_data = decoded_buf.data(); + decoded_size = decoded_buf.size(); break; case AudioCodec::PCM_16: - // No conversion needed — point directly at the input data - decoded_data = raw_audio; - decoded_size = raw_audio_size; + decoded_data = data; + decoded_size = size; break; default: - // Shouldn't ever get here because it will throw when converting the str to enum, - // but for safety - VIAM_SDK_LOG(error) << "Unsupported codec for playback: " << codec_str; - throw std::invalid_argument("Unsupported codec for playback"); + throw std::invalid_argument("process_and_write_pcm: unsupported codec"); } - // If we decoded into the buffer, point at it - if (!decoded_buf.empty()) { - decoded_data = decoded_buf.data(); - decoded_size = decoded_buf.size(); - } - - // Convert uint8_t bytes to int16_t samples - // PCM_16 means each sample is 2 bytes (16 bits) if (decoded_size % 2 != 0) { - VIAM_SDK_LOG(error) << "Audio data size must be even for PCM_16 format, got " << decoded_size << " bytes"; - throw std::invalid_argument("got invalid data size, cannot convert to int16"); + throw std::invalid_argument("process_and_write_pcm: PCM_16 size must be even, got " + std::to_string(decoded_size)); } - const int16_t* decoded_samples = reinterpret_cast(decoded_data); + const int16_t* samples = reinterpret_cast(decoded_data); size_t num_samples = decoded_size / sizeof(int16_t); - int speaker_sample_rate; - int speaker_num_channels; - - { - std::lock_guard lock(stream_mu_); - speaker_sample_rate = stream_params_.sample_rate; - speaker_num_channels = stream_params_.num_channels; - } - - // Convert channel count if needed (e.g. mono → stereo or stereo → mono) std::vector channel_converted; if (audio_num_channels != speaker_num_channels) { - VIAM_SDK_LOG(debug) << "Converting audio from " << audio_num_channels << " to " << speaker_num_channels << " channels"; - convert_channels(decoded_samples, num_samples, audio_num_channels, speaker_num_channels, channel_converted); - decoded_samples = channel_converted.data(); + convert_channels(samples, num_samples, audio_num_channels, speaker_num_channels, channel_converted); + samples = channel_converted.data(); num_samples = channel_converted.size(); - audio_num_channels = speaker_num_channels; } - // Resample if sample rates don't match - std::vector resampled_samples; - const int16_t* samples = decoded_samples; - size_t final_num_samples = num_samples; - int final_sample_rate = audio_sample_rate; - + std::vector resampled; if (audio_sample_rate != speaker_sample_rate) { - VIAM_SDK_LOG(info) << "resampling audio from " << audio_sample_rate << "Hz to speaker native sample rate " << speaker_sample_rate - << " Hz"; - resample_audio(audio_sample_rate, speaker_sample_rate, audio_num_channels, decoded_samples, num_samples, resampled_samples); - - // Use resampled data - samples = resampled_samples.data(); - final_num_samples = resampled_samples.size(); - final_sample_rate = speaker_sample_rate; + resample_audio(audio_sample_rate, speaker_sample_rate, speaker_num_channels, samples, num_samples, resampled); + samples = resampled.data(); + num_samples = resampled.size(); } - - // Check if audio duration exceeds playback buffer capacity - { - std::lock_guard lock(stream_mu_); - double duration_seconds = static_cast(final_num_samples) / (final_sample_rate * audio_num_channels); - if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { - VIAM_SDK_LOG(error) << "Audio duration (" << duration_seconds << " seconds) exceeds maximum playback buffer size (" - << audio::BUFFER_DURATION_SECONDS << " seconds)"; - throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + - " seconds)"); - } + if (num_samples == 0) { + throw std::invalid_argument("process_and_write_pcm: input too small to produce any output samples after resample"); } - VIAM_SDK_LOG(debug) << "Playing " << final_num_samples << " samples (" << final_num_samples * sizeof(int16_t) << " bytes)"; - - // Write samples to the audio buffer and capture context - uint64_t start_position; - std::shared_ptr playback_context; - { - std::lock_guard lock(stream_mu_); - if (!audio_context_) { - VIAM_SDK_LOG(error) << "[Play] Audio context is nullptr"; - throw std::runtime_error("Audio context is nullptr"); - } - playback_context = audio_context_; - start_position = audio_context_->get_write_position(); - - for (size_t i = 0; i < final_num_samples; i++) { - audio_context_->write_sample(samples[i]); + // Backpressure: cap how far the producer can run ahead of the callback so a faster-than- + // real-time source can't lap the read pointer and erase audio. + const uint64_t margin_samples = static_cast(speaker_sample_rate) * speaker_num_channels * BUFFER_MARGIN_MS / 1000; + const uint64_t max_ahead = static_cast(playback_context->buffer_capacity) - margin_samples; + + for (size_t i = 0; i < num_samples; ++i) { + // Use `write >= read + max_ahead` rather than `write - read >= max_ahead` so a read + // position ahead of write (only reachable in tests that pre-advance playback_position) + // doesn't underflow into a huge unsigned diff and trap us forever. + while (playback_context->get_write_position() >= playback_context->playback_position.load() + max_ahead) { + if (stop_requested_.load()) { + return i; + } + { + std::lock_guard lock(stream_mu_); + if (audio_context_ != playback_context) { + return i; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + playback_context->write_sample(samples[i]); } + return num_samples; +} - // Block until playback position catches up - VIAM_SDK_LOG(debug) << "Waiting for playback to complete..."; +void Speaker::wait_for_playback(std::shared_ptr playback_context, + uint64_t start_position, + uint64_t samples_to_drain) { uint64_t last_logged_overflow_count = 0; uint64_t last_logged_underflow_count = 0; uint64_t last_staleness_log_ns = 0; - while (playback_context->playback_position.load() - start_position < final_num_samples) { + while (playback_context->playback_position.load() - start_position < samples_to_drain) { if (stop_requested_.load()) { VIAM_SDK_LOG(debug) << "Playback stopped by stop command"; return; } - // Check if context changed (stream restarted by watchdog) PaStream* current_stream = nullptr; { std::lock_guard lock(stream_mu_); @@ -457,18 +484,18 @@ void Speaker::play(std::vector const& audio_data, current_stream = stream_; } - audio::utils::log_callback_staleness(playback_context->last_callback_time_ns, "[play]", current_stream, last_staleness_log_ns); + audio::utils::log_callback_staleness(playback_context->last_callback_time_ns, "[playback]", current_stream, last_staleness_log_ns); const uint64_t overflow_count = playback_context->output_overflow_count.load(); if (overflow_count != last_logged_overflow_count) { - VIAM_SDK_LOG(warn) << "[play] Output overflow detected — " << (overflow_count - last_logged_overflow_count) + VIAM_SDK_LOG(warn) << "[playback] Output overflow detected — " << (overflow_count - last_logged_overflow_count) << " new overflow(s), " << overflow_count << " total"; last_logged_overflow_count = overflow_count; } const uint64_t underflow_count = playback_context->output_underflow_count.load(); if (underflow_count != last_logged_underflow_count) { - VIAM_SDK_LOG(warn) << "[play] Output underflow detected — " << (underflow_count - last_logged_underflow_count) + VIAM_SDK_LOG(warn) << "[playback] Output underflow detected — " << (underflow_count - last_logged_underflow_count) << " new underflow(s), " << underflow_count << " total"; last_logged_underflow_count = underflow_count; } @@ -476,15 +503,70 @@ void Speaker::play(std::vector const& audio_data, std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - // Wait for audio pipeline to drain + // Drain the PortAudio pipeline so the caller knows the audio actually played. Skipped on + // the early-return paths above (stop / context swap) — both want to exit promptly. double drain_latency; { std::lock_guard lock(stream_mu_); drain_latency = latency_; } std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(drain_latency * 1000))); +} + +// Resampling and channel conversion run per-chunk against the source's audio_info, so chunk +// boundaries that don't align with the resampler's window can introduce minor artifacts. +void Speaker::play_stream(viam::sdk::audio_info info, + std::function>()> chunk_source, + const viam::sdk::ProtoStruct& extra) { + std::lock_guard playback_lock(playback_mu_); + stop_requested_.store(false); + + const AudioCodec source_codec = audio::codec::parse_codec(info.codec); + if (source_codec == AudioCodec::MP3) { + throw std::invalid_argument("[PlayStream] MP3 streaming is not supported; use play() for MP3"); + } + + int speaker_sample_rate; + int speaker_num_channels; + std::shared_ptr playback_context; + { + std::lock_guard lock(stream_mu_); + if (!audio_context_) { + throw std::runtime_error("[PlayStream] Audio context is nullptr"); + } + playback_context = audio_context_; + speaker_sample_rate = stream_params_.sample_rate; + speaker_num_channels = stream_params_.num_channels; + } + + const uint64_t start_position = playback_context->get_write_position(); + uint64_t total_samples_written = 0; + + while (auto chunk = chunk_source()) { + if (stop_requested_.load()) { + break; + } + { + std::lock_guard lock(stream_mu_); + if (audio_context_ != playback_context) { + return; + } + } + if (chunk->empty()) { + continue; + } + + total_samples_written += process_and_write_pcm(chunk->data(), + chunk->size(), + source_codec, + info.sample_rate_hz, + info.num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); + } - VIAM_SDK_LOG(debug) << "Audio playback complete"; + wait_for_playback(playback_context, start_position, total_samples_written); } viam::sdk::audio_properties Speaker::get_properties(const vsdk::ProtoStruct& extra) { diff --git a/src/speaker.hpp b/src/speaker.hpp index 7c9c5ff..6cbfbb1 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -10,6 +11,7 @@ #include #include #include +#include "audio_codec.hpp" #include "audio_stream.hpp" #include "audio_utils.hpp" #include "portaudio.h" @@ -54,8 +56,15 @@ class Speaker final : public viam::sdk::AudioOut { void play(std::vector const& audio_data, boost::optional info, const viam::sdk::ProtoStruct& extra); + void play_stream(viam::sdk::audio_info info, + std::function>()> chunk_source, + const viam::sdk::ProtoStruct& extra) override; + viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); + viam::sdk::ProtoStruct get_status() override { + return {}; + } // Member variables double latency_; @@ -98,6 +107,25 @@ class Speaker final : public viam::sdk::AudioOut { private: void restart_stalled_stream(const std::shared_ptr& playback_context); + + // Decode (PCM_16/32/32_FLOAT), channel-convert, resample, and write into the playback + // context's buffer. Writes are paced so the producer can't run more than buffer_capacity + // samples ahead of the callback; this propagates backpressure up through chunk_source. + // Returns the number of samples actually written — equal to the decoded input size on a + // full write, or a partial count if stop_requested_ fired or the stream context was + // swapped mid-write. Caller must hold playback_mu_. + size_t process_and_write_pcm(const uint8_t* data, + size_t size, + audio::codec::AudioCodec codec, + int audio_sample_rate, + int audio_num_channels, + int speaker_sample_rate, + int speaker_num_channels, + std::shared_ptr playback_context); + + void wait_for_playback(std::shared_ptr playback_context, + uint64_t start_position, + uint64_t samples_to_drain); }; } // namespace speaker diff --git a/test/speaker_test.cpp b/test/speaker_test.cpp index 6c5e922..f85b234 100644 --- a/test/speaker_test.cpp +++ b/test/speaker_test.cpp @@ -1005,6 +1005,487 @@ TEST_F(SpeakerTest, PlayPCM16WithWavHeader) { } +// play_stream tests + +TEST_F(SpeakerTest, PlayStream_RejectsMP3) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + viam::sdk::audio_info info{viam::sdk::audio_codecs::MP3, sample_rate, num_channels}; + ProtoStruct extra{}; + + auto chunk_source = []() -> boost::optional> { + return boost::none; + }; + + EXPECT_THROW(speaker.play_stream(info, chunk_source, extra), std::invalid_argument); +} + +TEST_F(SpeakerTest, PlayStream_EmptySourceCompletes) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + auto chunk_source = []() -> boost::optional> { + return boost::none; + }; + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), 0u); +} + +TEST_F(SpeakerTest, PlayStream_PCM16_MultipleChunks) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int samples_per_chunk = 100; + const int num_chunks = 3; + const int total_samples = samples_per_chunk * num_chunks; + + std::vector expected(total_samples); + for (int i = 0; i < total_samples; i++) { + expected[i] = static_cast(i + 1); + } + + int chunk_index = 0; + auto chunk_source = [&]() -> boost::optional> { + if (chunk_index >= num_chunks) { + return boost::none; + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), + expected.data() + chunk_index * samples_per_chunk, + chunk.size()); + chunk_index++; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + // Pre-advance playback position so wait_for_playback exits immediately once all chunks land. + speaker.audio_context_->playback_position.store(total_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(chunk_index, num_chunks); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(total_samples)); + + std::vector read_buffer(total_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), total_samples, read_pos); + EXPECT_EQ(samples_read, total_samples); + for (int i = 0; i < total_samples; i++) { + EXPECT_EQ(read_buffer[i], expected[i]); + } +} + +TEST_F(SpeakerTest, PlayStream_BackpressureBlocksProducerWhenBufferFull) { + // Small sample_rate keeps buffer_capacity tractable and the test fast. + // buffer_capacity = sample_rate * num_channels * BUFFER_DURATION_SECONDS = 30000. + // BUFFER_MARGIN_MS = 50 → margin_samples = 50, max_ahead = 29950. + const int sample_rate = 1000; + const int num_channels = 1; + const int buffer_margin_ms = 50; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const uint64_t buffer_capacity = static_cast(speaker.audio_context_->buffer_capacity); + const uint64_t margin_samples = + static_cast(sample_rate) * num_channels * buffer_margin_ms / 1000; + const uint64_t max_ahead = buffer_capacity - margin_samples; + + // Total samples exceed buffer_capacity, forcing the producer to block at least once + // while waiting for the consumer to advance playback_position. + const size_t samples_per_chunk = 5000; + const size_t num_chunks = 10; + const uint64_t total_samples = static_cast(samples_per_chunk * num_chunks); + ASSERT_GT(total_samples, buffer_capacity); + + std::atomic chunks_consumed{0}; + auto chunk_source = [&]() -> boost::optional> { + const size_t idx = chunks_consumed.load(); + if (idx >= num_chunks) { + return boost::none; + } + chunks_consumed.fetch_add(1); + return std::vector(samples_per_chunk * sizeof(int16_t), 0); + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + std::thread producer([&]() { + speaker.play_stream(info, chunk_source, extra); + }); + + auto wait_until = [](const std::function& predicate, std::chrono::milliseconds timeout) { + const auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) { + if (predicate()) return true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return predicate(); + }; + + // Phase 1: producer fills the buffer up to max_ahead while playback_position is 0. + const bool reached_cap = wait_until( + [&]() { return speaker.audio_context_->get_write_position() >= max_ahead; }, + std::chrono::seconds(1)); + ASSERT_TRUE(reached_cap) << "producer never reached max_ahead"; + + // Core invariant: write must never exceed read + max_ahead. + EXPECT_LE(speaker.audio_context_->get_write_position(), max_ahead); + + // With buffer full, producer must not have consumed all chunks. + EXPECT_LT(chunks_consumed.load(), num_chunks); + + // Confirm producer is genuinely blocked — write_pos stays put across a sleep. + const uint64_t write_pos_snapshot = speaker.audio_context_->get_write_position(); + const size_t chunks_consumed_snapshot = chunks_consumed.load(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), write_pos_snapshot) + << "producer kept writing while buffer was full"; + EXPECT_EQ(chunks_consumed.load(), chunks_consumed_snapshot) + << "producer pulled new chunks while blocked"; + + // Phase 2: advance playback_position; producer must resume. + speaker.audio_context_->playback_position.store(max_ahead); + const bool resumed = wait_until( + [&]() { return chunks_consumed.load() > chunks_consumed_snapshot; }, + std::chrono::seconds(1)); + EXPECT_TRUE(resumed) << "producer did not resume after playback_position advanced"; + + // Drain everything so wait_for_playback exits and play_stream returns cleanly. + speaker.audio_context_->playback_position.store(total_samples); + producer.join(); + + EXPECT_EQ(chunks_consumed.load(), num_chunks); + EXPECT_EQ(speaker.audio_context_->get_write_position(), total_samples); +} + +TEST_F(SpeakerTest, PlayStream_PCM32_DecodesChunk) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int num_samples = 100; + std::vector expected(num_samples); + for (int i = 0; i < num_samples; i++) { + expected[i] = static_cast(i * 100); + } + + std::vector pcm32_chunk; + audio::codec::convert_pcm16_to_pcm32(expected.data(), num_samples, pcm32_chunk); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return pcm32_chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_32, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(num_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + + std::vector read_buffer(num_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), num_samples, read_pos); + EXPECT_EQ(samples_read, num_samples); + for (int i = 0; i < num_samples; i++) { + EXPECT_EQ(read_buffer[i], expected[i]); + } +} + +TEST_F(SpeakerTest, PlayStream_PCM32Float_DecodesChunk) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int num_samples = 100; + std::vector expected(num_samples); + for (int i = 0; i < num_samples; i++) { + expected[i] = static_cast(i * 100); + } + + std::vector float_chunk; + audio::codec::convert_pcm16_to_float32(expected.data(), num_samples, float_chunk); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return float_chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_32_FLOAT, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(num_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + + std::vector read_buffer(num_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), num_samples, read_pos); + EXPECT_EQ(samples_read, num_samples); + for (int i = 0; i < num_samples; i++) { + EXPECT_NEAR(read_buffer[i], expected[i], 1); + } +} + +TEST_F(SpeakerTest, PlayStream_ChannelConversion) { + // Speaker is mono, chunks come in as stereo PCM16 — process_and_write_pcm should downmix. + const int sample_rate = 48000; + const int speaker_channels = 1; + const int audio_channels = 2; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(speaker_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int stereo_samples = 100; + std::vector stereo(stereo_samples); + for (int i = 0; i < stereo_samples; i++) { + stereo[i] = static_cast(i % 1000); + } + std::vector chunk(stereo_samples * sizeof(int16_t)); + std::memcpy(chunk.data(), stereo.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, audio_channels}; + ProtoStruct extra{}; + + const int expected_mono_samples = stereo_samples / 2; + speaker.audio_context_->playback_position.store(expected_mono_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(expected_mono_samples)); +} + +TEST_F(SpeakerTest, PlayStream_Resamples) { + const int speaker_rate = 48000; + const int audio_rate = 44100; + const int num_channels = 2; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(speaker_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int duration_ms = 50; + const int num_samples = (audio_rate * duration_ms / 1000) * num_channels; + std::vector samples(num_samples); + for (int i = 0; i < num_samples; i++) { + samples[i] = static_cast(i % 1000); + } + std::vector chunk(num_samples * sizeof(int16_t)); + std::memcpy(chunk.data(), samples.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, audio_rate, num_channels}; + ProtoStruct extra{}; + + const int expected_resampled = (num_samples * speaker_rate) / audio_rate; + speaker.audio_context_->playback_position.store(expected_resampled); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(expected_resampled)); +} + +TEST_F(SpeakerTest, PlayStream_StopRequestedBreaksLoop) { + // Verify a mid-stream stop_requested causes the loop to exit after the current chunk + // without consuming the remaining chunks. + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int samples_per_chunk = 100; + std::vector chunk_samples(samples_per_chunk); + for (int i = 0; i < samples_per_chunk; i++) { + chunk_samples[i] = static_cast(i); + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), chunk_samples.data(), chunk.size()); + + int chunks_pulled = 0; + auto chunk_source = [&]() -> boost::optional> { + chunks_pulled++; + if (chunks_pulled == 1) { + return chunk; + } + // After delivering the first chunk, request a stop. The loop should break + // on the next iteration before processing this second chunk. + speaker.stop_requested_.store(true); + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + // Only the first chunk should be written before stop interrupts. + speaker.audio_context_->playback_position.store(samples_per_chunk); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(chunks_pulled, 2); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(samples_per_chunk)); +} + +TEST_F(SpeakerTest, PlayStream_ResetsStopRequestedOnEntry) { + // play_stream() must clear a previously-set stop_requested_ so a prior stop command + // doesn't poison the next stream. + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + speaker.stop_requested_.store(true); + + const int samples_per_chunk = 50; + std::vector chunk_samples(samples_per_chunk); + for (int i = 0; i < samples_per_chunk; i++) { + chunk_samples[i] = static_cast(i + 1); + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), chunk_samples.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(samples_per_chunk); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(samples_per_chunk)); +} + + // Watchdog: when the speaker callback stops firing, the background watcher should // detect the staleness on its next poll and replace audio_context_ with a fresh one. // We force the stale state by manually setting last_callback_time_ns to a timestamp