From ee7b45017a633ee47c40b2e5c8337ba6cbb7d787 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 09:28:04 -0500 Subject: [PATCH 01/13] merge --- src/speaker.cpp | 270 +++++++++++++++++++++++++++++++----------------- src/speaker.hpp | 26 +++++ 2 files changed, 199 insertions(+), 97 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 4f2df73..a0f7a25 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -302,9 +302,7 @@ void Speaker::play(std::vector const& audio_data, } const std::string codec_str = info->codec; - - // Parse codec string to enum - const AudioCodec codec = audio::codec::parse_codec(codec_str); + AudioCodec codec = audio::codec::parse_codec(codec_str); // Detect and strip WAV header if present const uint8_t* raw_audio = audio_data.data(); @@ -320,133 +318,149 @@ void Speaker::play(std::vector const& audio_data, raw_audio_size -= audio::codec::wav_header_size; } - // decoded_buf holds converted data for non-PCM_16 codecs + // MP3 requires whole-buffer decode (stateful decoder, sample rate / channels come from + // the stream header). After decoding, the rest of the pipeline treats it as PCM_16. + std::vector mp3_decoded; + if (codec == AudioCodec::MP3) { + MP3DecoderContext mp3_ctx; + decode_mp3_to_pcm16(mp3_ctx, raw_audio, raw_audio_size, mp3_decoded); + audio_sample_rate = mp3_ctx.sample_rate; + audio_num_channels = mp3_ctx.num_channels; + raw_audio = mp3_decoded.data(); + raw_audio_size = mp3_decoded.size(); + codec = AudioCodec::PCM_16; + } + + int speaker_sample_rate; + int speaker_num_channels; + std::shared_ptr playback_context; + { + std::lock_guard lock(stream_mu_); + if (!audio_context_) { + VIAM_SDK_LOG(error) << "[Play] Audio context is nullptr"; + throw std::runtime_error("Audio context is nullptr"); + } + playback_context = audio_context_; + speaker_sample_rate = stream_params_.sample_rate; + speaker_num_channels = stream_params_.num_channels; + } + + // Reject up-front if the audio would exceed the playback buffer. Estimate the post-resample + // sample count from the input bytes — exact for PCM_16, close enough to detect "too long" + // for PCM_32 variants (input is 2x the byte count of equivalent PCM_16). + { + const size_t pcm16_bytes_estimate = (codec == AudioCodec::PCM_16) ? raw_audio_size : raw_audio_size / 2; + const size_t input_samples_estimate = pcm16_bytes_estimate / sizeof(int16_t); + const double duration_seconds = + static_cast(input_samples_estimate) / (audio_sample_rate * audio_num_channels); + if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { + VIAM_SDK_LOG(error) << "Audio duration (" << duration_seconds << " seconds) exceeds maximum playback buffer size (" + << audio::BUFFER_DURATION_SECONDS << " seconds); use PlayStream for longer audio"; + throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + + " seconds); use PlayStream for longer audio"); + } + } + + const uint64_t start_position = playback_context->get_write_position(); + const size_t samples_written = process_and_write_pcm(raw_audio, + raw_audio_size, + codec, + audio_sample_rate, + audio_num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); + + VIAM_SDK_LOG(debug) << "Waiting for playback to complete..."; + wait_for_playback(playback_context, start_position, samples_written); + + // Wait for audio pipeline to drain + std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(latency_ * 1000))); + VIAM_SDK_LOG(debug) << "Audio playback complete"; +} + +size_t Speaker::process_and_write_pcm(const uint8_t* data, + size_t size, + AudioCodec codec, + int audio_sample_rate, + int audio_num_channels, + int speaker_sample_rate, + int speaker_num_channels, + std::shared_ptr playback_context) { + // Decode to PCM_16 (stateless codecs only — caller handles MP3 ahead of time). std::vector decoded_buf; const uint8_t* decoded_data = nullptr; size_t decoded_size = 0; - - // decode to pcm16 switch (codec) { - case AudioCodec::MP3: { - MP3DecoderContext mp3_ctx; - decode_mp3_to_pcm16(mp3_ctx, raw_audio, raw_audio_size, decoded_buf); - // For MP3, use the decoded properties from the file, not what user provided - audio_sample_rate = mp3_ctx.sample_rate; - audio_num_channels = mp3_ctx.num_channels; - break; - } case AudioCodec::PCM_32: - audio::codec::convert_pcm32_to_pcm16(raw_audio, raw_audio_size, decoded_buf); + audio::codec::convert_pcm32_to_pcm16(data, size, decoded_buf); + decoded_data = decoded_buf.data(); + decoded_size = decoded_buf.size(); break; case AudioCodec::PCM_32_FLOAT: - audio::codec::convert_float32_to_pcm16(raw_audio, raw_audio_size, decoded_buf); + audio::codec::convert_float32_to_pcm16(data, size, decoded_buf); + decoded_data = decoded_buf.data(); + decoded_size = decoded_buf.size(); break; case AudioCodec::PCM_16: - // No conversion needed — point directly at the input data - decoded_data = raw_audio; - decoded_size = raw_audio_size; + decoded_data = data; + decoded_size = size; break; default: - // Shouldn't ever get here because it will throw when converting the str to enum, - // but for safety - VIAM_SDK_LOG(error) << "Unsupported codec for playback: " << codec_str; - throw std::invalid_argument("Unsupported codec for playback"); - } - - // If we decoded into the buffer, point at it - if (!decoded_buf.empty()) { - decoded_data = decoded_buf.data(); - decoded_size = decoded_buf.size(); + throw std::invalid_argument("process_and_write_pcm: unsupported codec"); } - // Convert uint8_t bytes to int16_t samples - // PCM_16 means each sample is 2 bytes (16 bits) if (decoded_size % 2 != 0) { - VIAM_SDK_LOG(error) << "Audio data size must be even for PCM_16 format, got " << decoded_size << " bytes"; - throw std::invalid_argument("got invalid data size, cannot convert to int16"); + VIAM_SDK_LOG(error) << "PCM_16 size must be even, got " << decoded_size; + throw std::invalid_argument("invalid PCM_16 data size"); } - const int16_t* decoded_samples = reinterpret_cast(decoded_data); + const int16_t* samples = reinterpret_cast(decoded_data); size_t num_samples = decoded_size / sizeof(int16_t); - int speaker_sample_rate; - int speaker_num_channels; - - { - std::lock_guard lock(stream_mu_); - speaker_sample_rate = stream_params_.sample_rate; - speaker_num_channels = stream_params_.num_channels; - } - - // Convert channel count if needed (e.g. mono → stereo or stereo → mono) + // Channel conversion. std::vector channel_converted; if (audio_num_channels != speaker_num_channels) { VIAM_SDK_LOG(debug) << "Converting audio from " << audio_num_channels << " to " << speaker_num_channels << " channels"; - convert_channels(decoded_samples, num_samples, audio_num_channels, speaker_num_channels, channel_converted); - decoded_samples = channel_converted.data(); + convert_channels(samples, num_samples, audio_num_channels, speaker_num_channels, channel_converted); + samples = channel_converted.data(); num_samples = channel_converted.size(); - audio_num_channels = speaker_num_channels; } - // Resample if sample rates don't match - std::vector resampled_samples; - const int16_t* samples = decoded_samples; - size_t final_num_samples = num_samples; - int final_sample_rate = audio_sample_rate; - + // Resample. + std::vector resampled; if (audio_sample_rate != speaker_sample_rate) { - VIAM_SDK_LOG(info) << "resampling audio from " << audio_sample_rate << "Hz to speaker native sample rate " << speaker_sample_rate - << " Hz"; - resample_audio(audio_sample_rate, speaker_sample_rate, audio_num_channels, decoded_samples, num_samples, resampled_samples); - - // Use resampled data - samples = resampled_samples.data(); - final_num_samples = resampled_samples.size(); - final_sample_rate = speaker_sample_rate; - } - - // Check if audio duration exceeds playback buffer capacity - { - std::lock_guard lock(stream_mu_); - double duration_seconds = static_cast(final_num_samples) / (final_sample_rate * audio_num_channels); - if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { - VIAM_SDK_LOG(error) << "Audio duration (" << duration_seconds << " seconds) exceeds maximum playback buffer size (" - << audio::BUFFER_DURATION_SECONDS << " seconds)"; - throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + - " seconds)"); - } + VIAM_SDK_LOG(info) << "resampling audio from " << audio_sample_rate << "Hz to speaker native sample rate " + << speaker_sample_rate << " Hz"; + resample_audio(audio_sample_rate, speaker_sample_rate, speaker_num_channels, samples, num_samples, resampled); + samples = resampled.data(); + num_samples = resampled.size(); } - VIAM_SDK_LOG(debug) << "Playing " << final_num_samples << " samples (" << final_num_samples * sizeof(int16_t) << " bytes)"; - - // Write samples to the audio buffer and capture context - uint64_t start_position; - std::shared_ptr playback_context; { std::lock_guard lock(stream_mu_); - if (!audio_context_) { - VIAM_SDK_LOG(error) << "[Play] Audio context is nullptr"; - throw std::runtime_error("Audio context is nullptr"); + if (audio_context_ != playback_context) { + VIAM_SDK_LOG(debug) << "process_and_write_pcm: context changed, aborting write"; + return 0; } - playback_context = audio_context_; - start_position = audio_context_->get_write_position(); - - for (size_t i = 0; i < final_num_samples; i++) { + for (size_t i = 0; i < num_samples; i++) { audio_context_->write_sample(samples[i]); } } + return num_samples; +} - // Block until playback position catches up - VIAM_SDK_LOG(debug) << "Waiting for playback to complete..."; +void Speaker::wait_for_playback(std::shared_ptr playback_context, + uint64_t start_position, + uint64_t samples_to_drain) { uint64_t last_logged_overflow_count = 0; uint64_t last_logged_underflow_count = 0; uint64_t last_staleness_log_ns = 0; - while (playback_context->playback_position.load() - start_position < final_num_samples) { + while (playback_context->playback_position.load() - start_position < samples_to_drain) { if (stop_requested_.load()) { VIAM_SDK_LOG(debug) << "Playback stopped by stop command"; return; } - // Check if context changed (stream restarted by watchdog) PaStream* current_stream = nullptr; { std::lock_guard lock(stream_mu_); @@ -457,34 +471,96 @@ void Speaker::play(std::vector const& audio_data, current_stream = stream_; } - audio::utils::log_callback_staleness(playback_context->last_callback_time_ns, "[play]", current_stream, last_staleness_log_ns); + audio::utils::log_callback_staleness(playback_context->last_callback_time_ns, "[playback]", current_stream, last_staleness_log_ns); const uint64_t overflow_count = playback_context->output_overflow_count.load(); if (overflow_count != last_logged_overflow_count) { - VIAM_SDK_LOG(warn) << "[play] Output overflow detected — " << (overflow_count - last_logged_overflow_count) + VIAM_SDK_LOG(warn) << "[playback] Output overflow detected — " << (overflow_count - last_logged_overflow_count) << " new overflow(s), " << overflow_count << " total"; last_logged_overflow_count = overflow_count; } const uint64_t underflow_count = playback_context->output_underflow_count.load(); if (underflow_count != last_logged_underflow_count) { - VIAM_SDK_LOG(warn) << "[play] Output underflow detected — " << (underflow_count - last_logged_underflow_count) + VIAM_SDK_LOG(warn) << "[playback] Output underflow detected — " << (underflow_count - last_logged_underflow_count) << " new underflow(s), " << underflow_count << " total"; last_logged_underflow_count = underflow_count; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } +} - // Wait for audio pipeline to drain - double drain_latency; +/** + * Stream audio chunks into the speaker for continuous playback. + * + * Chunks are pulled from `chunk_source` and written into the circular buffer as they arrive. + * Playback runs concurrently via the PortAudio callback. The method blocks until the source + * signals end-of-stream (returns boost::none) and the buffer has drained. + * + * PoC scope: supports stateless codecs (PCM_16, PCM_32, PCM_32_FLOAT). MP3 is not supported + * here because per-chunk decoding requires stateful decoder context across chunks. Channel + * count and sample rate conversion are applied per-chunk against the init audio_info. + * + * @throws std::invalid_argument if codec is unsupported for streaming. + */ +void Speaker::play_stream(viam::sdk::audio_info info, + std::function>()> chunk_source, + const viam::sdk::ProtoStruct& extra) { + std::lock_guard playback_lock(playback_mu_); + stop_requested_.store(false); + + VIAM_SDK_LOG(debug) << "PlayStream called, codec=" << info.codec << " rate=" << info.sample_rate_hz + << " channels=" << info.num_channels; + + const AudioCodec codec = audio::codec::parse_codec(info.codec); + if (codec == AudioCodec::MP3) { + throw std::invalid_argument("[PlayStream]: MP3 codec not supported for streaming"); + } + + int speaker_sample_rate; + int speaker_num_channels; + std::shared_ptr playback_context; { std::lock_guard lock(stream_mu_); - drain_latency = latency_; + if (!audio_context_) { + VIAM_SDK_LOG(error) << "[PlayStream] Audio context is nullptr"; + throw std::runtime_error("Audio context is nullptr"); + } + playback_context = audio_context_; + speaker_sample_rate = stream_params_.sample_rate; + speaker_num_channels = stream_params_.num_channels; } - std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(drain_latency * 1000))); - VIAM_SDK_LOG(debug) << "Audio playback complete"; + const uint64_t start_position = playback_context->get_write_position(); + uint64_t total_samples_written = 0; + + while (auto chunk = chunk_source()) { + if (stop_requested_.load()) { + VIAM_SDK_LOG(debug) << "[PlayStream] Stop requested, ending stream"; + break; + } + // Per-chunk resampling is stateless across chunks; minor boundary artifacts possible. + const size_t written = process_and_write_pcm(chunk->data(), + chunk->size(), + codec, + info.sample_rate_hz, + info.num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); + if (written == 0) { + return; + } + total_samples_written += written; + } + + VIAM_SDK_LOG(debug) << "[PlayStream] Source exhausted, " << total_samples_written + << " samples written, waiting for drain"; + wait_for_playback(playback_context, start_position, total_samples_written); + + std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(latency_ * 1000))); + VIAM_SDK_LOG(debug) << "[PlayStream] Stream playback complete"; } viam::sdk::audio_properties Speaker::get_properties(const vsdk::ProtoStruct& extra) { diff --git a/src/speaker.hpp b/src/speaker.hpp index 7c9c5ff..4d088b0 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -10,6 +11,7 @@ #include #include #include +#include "audio_codec.hpp" #include "audio_stream.hpp" #include "audio_utils.hpp" #include "portaudio.h" @@ -54,6 +56,10 @@ class Speaker final : public viam::sdk::AudioOut { void play(std::vector const& audio_data, boost::optional info, const viam::sdk::ProtoStruct& extra); + void play_stream(viam::sdk::audio_info info, + std::function>()> chunk_source, + const viam::sdk::ProtoStruct& extra) override; + viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); @@ -98,6 +104,26 @@ class Speaker final : public viam::sdk::AudioOut { private: void restart_stalled_stream(const std::shared_ptr& playback_context); + + private: + // Decode a buffer of PCM bytes (PCM_16, PCM_32, or PCM_32_FLOAT), apply channel conversion + // and resampling against the speaker's current native format, and write the result into the + // playback context's circular buffer. Returns the number of samples written. + // Caller must already hold playback_mu_. Acquires stream_mu_ for the write. + size_t process_and_write_pcm(const uint8_t* data, + size_t size, + audio::codec::AudioCodec codec, + int audio_sample_rate, + int audio_num_channels, + int speaker_sample_rate, + int speaker_num_channels, + std::shared_ptr playback_context); + + // Block until `samples_to_drain` samples have been played past `start_position`. Honors + // stop_requested_ and detects audio-context swap. Logs overflow/underflow. + void wait_for_playback(std::shared_ptr playback_context, + uint64_t start_position, + uint64_t samples_to_drain); }; } // namespace speaker From 92acf6d7d0a70dbb117a9f43f9bbdf411b3cc2d9 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Wed, 20 May 2026 10:58:20 -0400 Subject: [PATCH 02/13] add tests --- test/speaker_test.cpp | 388 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 388 insertions(+) diff --git a/test/speaker_test.cpp b/test/speaker_test.cpp index 6c5e922..48b8e59 100644 --- a/test/speaker_test.cpp +++ b/test/speaker_test.cpp @@ -1005,6 +1005,394 @@ TEST_F(SpeakerTest, PlayPCM16WithWavHeader) { } +// play_stream tests + +TEST_F(SpeakerTest, PlayStream_RejectsMP3) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + viam::sdk::audio_info info{viam::sdk::audio_codecs::MP3, sample_rate, num_channels}; + ProtoStruct extra{}; + + auto chunk_source = []() -> boost::optional> { + return boost::none; + }; + + EXPECT_THROW(speaker.play_stream(info, chunk_source, extra), std::invalid_argument); +} + +TEST_F(SpeakerTest, PlayStream_EmptySourceCompletes) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + auto chunk_source = []() -> boost::optional> { + return boost::none; + }; + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), 0u); +} + +TEST_F(SpeakerTest, PlayStream_PCM16_MultipleChunks) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int samples_per_chunk = 100; + const int num_chunks = 3; + const int total_samples = samples_per_chunk * num_chunks; + + std::vector expected(total_samples); + for (int i = 0; i < total_samples; i++) { + expected[i] = static_cast(i + 1); + } + + int chunk_index = 0; + auto chunk_source = [&]() -> boost::optional> { + if (chunk_index >= num_chunks) { + return boost::none; + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), + expected.data() + chunk_index * samples_per_chunk, + chunk.size()); + chunk_index++; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + // Pre-advance playback position so wait_for_playback exits immediately once all chunks land. + speaker.audio_context_->playback_position.store(total_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(chunk_index, num_chunks); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(total_samples)); + + std::vector read_buffer(total_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), total_samples, read_pos); + EXPECT_EQ(samples_read, total_samples); + for (int i = 0; i < total_samples; i++) { + EXPECT_EQ(read_buffer[i], expected[i]); + } +} + +TEST_F(SpeakerTest, PlayStream_PCM32_DecodesChunk) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int num_samples = 100; + std::vector expected(num_samples); + for (int i = 0; i < num_samples; i++) { + expected[i] = static_cast(i * 100); + } + + std::vector pcm32_chunk; + audio::codec::convert_pcm16_to_pcm32(expected.data(), num_samples, pcm32_chunk); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return pcm32_chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_32, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(num_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + + std::vector read_buffer(num_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), num_samples, read_pos); + EXPECT_EQ(samples_read, num_samples); + for (int i = 0; i < num_samples; i++) { + EXPECT_EQ(read_buffer[i], expected[i]); + } +} + +TEST_F(SpeakerTest, PlayStream_PCM32Float_DecodesChunk) { + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int num_samples = 100; + std::vector expected(num_samples); + for (int i = 0; i < num_samples; i++) { + expected[i] = static_cast(i * 100); + } + + std::vector float_chunk; + audio::codec::convert_pcm16_to_float32(expected.data(), num_samples, float_chunk); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return float_chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_32_FLOAT, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(num_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + + std::vector read_buffer(num_samples); + uint64_t read_pos = 0; + int samples_read = speaker.audio_context_->read_samples(read_buffer.data(), num_samples, read_pos); + EXPECT_EQ(samples_read, num_samples); + for (int i = 0; i < num_samples; i++) { + EXPECT_NEAR(read_buffer[i], expected[i], 1); + } +} + +TEST_F(SpeakerTest, PlayStream_ChannelConversion) { + // Speaker is mono, chunks come in as stereo PCM16 — process_and_write_pcm should downmix. + const int sample_rate = 48000; + const int speaker_channels = 1; + const int audio_channels = 2; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(speaker_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int stereo_samples = 100; + std::vector stereo(stereo_samples); + for (int i = 0; i < stereo_samples; i++) { + stereo[i] = static_cast(i % 1000); + } + std::vector chunk(stereo_samples * sizeof(int16_t)); + std::memcpy(chunk.data(), stereo.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, audio_channels}; + ProtoStruct extra{}; + + const int expected_mono_samples = stereo_samples / 2; + speaker.audio_context_->playback_position.store(expected_mono_samples); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(expected_mono_samples)); +} + +TEST_F(SpeakerTest, PlayStream_Resamples) { + const int speaker_rate = 48000; + const int audio_rate = 44100; + const int num_channels = 2; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(speaker_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int duration_ms = 50; + const int num_samples = (audio_rate * duration_ms / 1000) * num_channels; + std::vector samples(num_samples); + for (int i = 0; i < num_samples; i++) { + samples[i] = static_cast(i % 1000); + } + std::vector chunk(num_samples * sizeof(int16_t)); + std::memcpy(chunk.data(), samples.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, audio_rate, num_channels}; + ProtoStruct extra{}; + + const int expected_resampled = (num_samples * speaker_rate) / audio_rate; + speaker.audio_context_->playback_position.store(expected_resampled); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(expected_resampled)); +} + +TEST_F(SpeakerTest, PlayStream_StopRequestedBreaksLoop) { + // Verify a mid-stream stop_requested causes the loop to exit after the current chunk + // without consuming the remaining chunks. + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const int samples_per_chunk = 100; + std::vector chunk_samples(samples_per_chunk); + for (int i = 0; i < samples_per_chunk; i++) { + chunk_samples[i] = static_cast(i); + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), chunk_samples.data(), chunk.size()); + + int chunks_pulled = 0; + auto chunk_source = [&]() -> boost::optional> { + chunks_pulled++; + if (chunks_pulled == 1) { + return chunk; + } + // After delivering the first chunk, request a stop. The loop should break + // on the next iteration before processing this second chunk. + speaker.stop_requested_.store(true); + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + // Only the first chunk should be written before stop interrupts. + speaker.audio_context_->playback_position.store(samples_per_chunk); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(chunks_pulled, 2); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(samples_per_chunk)); +} + +TEST_F(SpeakerTest, PlayStream_ResetsStopRequestedOnEntry) { + // play_stream() must clear a previously-set stop_requested_ so a prior stop command + // doesn't poison the next stream. + const int sample_rate = 48000; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + speaker.stop_requested_.store(true); + + const int samples_per_chunk = 50; + std::vector chunk_samples(samples_per_chunk); + for (int i = 0; i < samples_per_chunk; i++) { + chunk_samples[i] = static_cast(i + 1); + } + std::vector chunk(samples_per_chunk * sizeof(int16_t)); + std::memcpy(chunk.data(), chunk_samples.data(), chunk.size()); + + bool delivered = false; + auto chunk_source = [&]() -> boost::optional> { + if (delivered) { + return boost::none; + } + delivered = true; + return chunk; + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + speaker.audio_context_->playback_position.store(samples_per_chunk); + + EXPECT_NO_THROW(speaker.play_stream(info, chunk_source, extra)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), static_cast(samples_per_chunk)); +} + + // Watchdog: when the speaker callback stops firing, the background watcher should // detect the staleness on its next poll and replace audio_context_ with a fresh one. // We force the stale state by manually setting last_callback_time_ns to a timestamp From a1049f6dca4cba34d9163d5b83ca203720575c54 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 11:03:16 -0500 Subject: [PATCH 03/13] fixes --- src/speaker.cpp | 79 +++++++++++++++++++++---------------------------- src/speaker.hpp | 10 ++----- 2 files changed, 36 insertions(+), 53 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index a0f7a25..8371bac 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -318,8 +318,7 @@ void Speaker::play(std::vector const& audio_data, raw_audio_size -= audio::codec::wav_header_size; } - // MP3 requires whole-buffer decode (stateful decoder, sample rate / channels come from - // the stream header). After decoding, the rest of the pipeline treats it as PCM_16. + // MP3 needs a stateful whole-buffer decode; the rest of the pipeline only sees PCM_16. std::vector mp3_decoded; if (codec == AudioCodec::MP3) { MP3DecoderContext mp3_ctx; @@ -345,17 +344,14 @@ void Speaker::play(std::vector const& audio_data, speaker_num_channels = stream_params_.num_channels; } - // Reject up-front if the audio would exceed the playback buffer. Estimate the post-resample - // sample count from the input bytes — exact for PCM_16, close enough to detect "too long" - // for PCM_32 variants (input is 2x the byte count of equivalent PCM_16). + // Estimate post-resample sample count from input bytes — exact for PCM_16, ~2x conservative + // for PCM_32 variants — to bail before allocating if the audio won't fit the playback buffer. { const size_t pcm16_bytes_estimate = (codec == AudioCodec::PCM_16) ? raw_audio_size : raw_audio_size / 2; const size_t input_samples_estimate = pcm16_bytes_estimate / sizeof(int16_t); const double duration_seconds = static_cast(input_samples_estimate) / (audio_sample_rate * audio_num_channels); if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { - VIAM_SDK_LOG(error) << "Audio duration (" << duration_seconds << " seconds) exceeds maximum playback buffer size (" - << audio::BUFFER_DURATION_SECONDS << " seconds); use PlayStream for longer audio"; throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + " seconds); use PlayStream for longer audio"); } @@ -371,12 +367,7 @@ void Speaker::play(std::vector const& audio_data, speaker_num_channels, playback_context); - VIAM_SDK_LOG(debug) << "Waiting for playback to complete..."; wait_for_playback(playback_context, start_position, samples_written); - - // Wait for audio pipeline to drain - std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(latency_ * 1000))); - VIAM_SDK_LOG(debug) << "Audio playback complete"; } size_t Speaker::process_and_write_pcm(const uint8_t* data, @@ -387,7 +378,12 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, int speaker_sample_rate, int speaker_num_channels, std::shared_ptr playback_context) { - // Decode to PCM_16 (stateless codecs only — caller handles MP3 ahead of time). + // A 0 return is reserved to mean "the stream context was swapped out"; reject empty + // input up front so callers can rely on that contract. + if (size == 0) { + throw std::invalid_argument("process_and_write_pcm: empty input"); + } + std::vector decoded_buf; const uint8_t* decoded_data = nullptr; size_t decoded_size = 0; @@ -411,36 +407,35 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, } if (decoded_size % 2 != 0) { - VIAM_SDK_LOG(error) << "PCM_16 size must be even, got " << decoded_size; - throw std::invalid_argument("invalid PCM_16 data size"); + throw std::invalid_argument("process_and_write_pcm: PCM_16 size must be even, got " + std::to_string(decoded_size)); } const int16_t* samples = reinterpret_cast(decoded_data); size_t num_samples = decoded_size / sizeof(int16_t); - // Channel conversion. std::vector channel_converted; if (audio_num_channels != speaker_num_channels) { - VIAM_SDK_LOG(debug) << "Converting audio from " << audio_num_channels << " to " << speaker_num_channels << " channels"; convert_channels(samples, num_samples, audio_num_channels, speaker_num_channels, channel_converted); samples = channel_converted.data(); num_samples = channel_converted.size(); } - // Resample. std::vector resampled; if (audio_sample_rate != speaker_sample_rate) { - VIAM_SDK_LOG(info) << "resampling audio from " << audio_sample_rate << "Hz to speaker native sample rate " - << speaker_sample_rate << " Hz"; resample_audio(audio_sample_rate, speaker_sample_rate, speaker_num_channels, samples, num_samples, resampled); samples = resampled.data(); num_samples = resampled.size(); } + // Guarantees the post-decode pipeline preserves the "non-empty in → non-empty out" invariant, + // so the 0 return below can only mean a context swap. + if (num_samples == 0) { + throw std::invalid_argument("process_and_write_pcm: input too small to produce any output samples after resample"); + } + { std::lock_guard lock(stream_mu_); if (audio_context_ != playback_context) { - VIAM_SDK_LOG(debug) << "process_and_write_pcm: context changed, aborting write"; return 0; } for (size_t i = 0; i < num_samples; i++) { @@ -489,30 +484,26 @@ void Speaker::wait_for_playback(std::shared_ptr play std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + + // Drain the PortAudio pipeline so the caller knows the audio actually played. Skipped on + // the early-return paths above (stop / context swap) — both want to exit promptly. + double drain_latency; + { + std::lock_guard lock(stream_mu_); + drain_latency = latency_; + } + std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(drain_latency * 1000))); } -/** - * Stream audio chunks into the speaker for continuous playback. - * - * Chunks are pulled from `chunk_source` and written into the circular buffer as they arrive. - * Playback runs concurrently via the PortAudio callback. The method blocks until the source - * signals end-of-stream (returns boost::none) and the buffer has drained. - * - * PoC scope: supports stateless codecs (PCM_16, PCM_32, PCM_32_FLOAT). MP3 is not supported - * here because per-chunk decoding requires stateful decoder context across chunks. Channel - * count and sample rate conversion are applied per-chunk against the init audio_info. - * - * @throws std::invalid_argument if codec is unsupported for streaming. - */ +// MP3 is not supported here because per-chunk decoding needs decoder state across chunks. +// Resampling and channel conversion run per-chunk against the init audio_info, so chunk +// boundaries that don't align with the resampler's window can introduce minor artifacts. void Speaker::play_stream(viam::sdk::audio_info info, std::function>()> chunk_source, const viam::sdk::ProtoStruct& extra) { std::lock_guard playback_lock(playback_mu_); stop_requested_.store(false); - VIAM_SDK_LOG(debug) << "PlayStream called, codec=" << info.codec << " rate=" << info.sample_rate_hz - << " channels=" << info.num_channels; - const AudioCodec codec = audio::codec::parse_codec(info.codec); if (codec == AudioCodec::MP3) { throw std::invalid_argument("[PlayStream]: MP3 codec not supported for streaming"); @@ -524,8 +515,7 @@ void Speaker::play_stream(viam::sdk::audio_info info, { std::lock_guard lock(stream_mu_); if (!audio_context_) { - VIAM_SDK_LOG(error) << "[PlayStream] Audio context is nullptr"; - throw std::runtime_error("Audio context is nullptr"); + throw std::runtime_error("[PlayStream] Audio context is nullptr"); } playback_context = audio_context_; speaker_sample_rate = stream_params_.sample_rate; @@ -537,10 +527,11 @@ void Speaker::play_stream(viam::sdk::audio_info info, while (auto chunk = chunk_source()) { if (stop_requested_.load()) { - VIAM_SDK_LOG(debug) << "[PlayStream] Stop requested, ending stream"; break; } - // Per-chunk resampling is stateless across chunks; minor boundary artifacts possible. + if (chunk->empty()) { + continue; + } const size_t written = process_and_write_pcm(chunk->data(), chunk->size(), codec, @@ -550,17 +541,13 @@ void Speaker::play_stream(viam::sdk::audio_info info, speaker_num_channels, playback_context); if (written == 0) { + // Stream context was swapped; bail without draining. return; } total_samples_written += written; } - VIAM_SDK_LOG(debug) << "[PlayStream] Source exhausted, " << total_samples_written - << " samples written, waiting for drain"; wait_for_playback(playback_context, start_position, total_samples_written); - - std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(latency_ * 1000))); - VIAM_SDK_LOG(debug) << "[PlayStream] Stream playback complete"; } viam::sdk::audio_properties Speaker::get_properties(const vsdk::ProtoStruct& extra) { diff --git a/src/speaker.hpp b/src/speaker.hpp index 4d088b0..1b78eeb 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -105,11 +105,9 @@ class Speaker final : public viam::sdk::AudioOut { private: void restart_stalled_stream(const std::shared_ptr& playback_context); - private: - // Decode a buffer of PCM bytes (PCM_16, PCM_32, or PCM_32_FLOAT), apply channel conversion - // and resampling against the speaker's current native format, and write the result into the - // playback context's circular buffer. Returns the number of samples written. - // Caller must already hold playback_mu_. Acquires stream_mu_ for the write. + // Decode (PCM_16/32/32_FLOAT), channel-convert, resample, and write into the playback + // context's buffer. Returns samples written, or 0 if the stream context was swapped + // mid-call (the only path that produces a 0 return). Caller must hold playback_mu_. size_t process_and_write_pcm(const uint8_t* data, size_t size, audio::codec::AudioCodec codec, @@ -119,8 +117,6 @@ class Speaker final : public viam::sdk::AudioOut { int speaker_num_channels, std::shared_ptr playback_context); - // Block until `samples_to_drain` samples have been played past `start_position`. Honors - // stop_requested_ and detects audio-context swap. Logs overflow/underflow. void wait_for_playback(std::shared_ptr playback_context, uint64_t start_position, uint64_t samples_to_drain); From 25319ecc1aab7ab1bd44721053752d3c276a4f42 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 11:03:53 -0500 Subject: [PATCH 04/13] lint --- src/speaker.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 8371bac..86b9864 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -349,8 +349,7 @@ void Speaker::play(std::vector const& audio_data, { const size_t pcm16_bytes_estimate = (codec == AudioCodec::PCM_16) ? raw_audio_size : raw_audio_size / 2; const size_t input_samples_estimate = pcm16_bytes_estimate / sizeof(int16_t); - const double duration_seconds = - static_cast(input_samples_estimate) / (audio_sample_rate * audio_num_channels); + const double duration_seconds = static_cast(input_samples_estimate) / (audio_sample_rate * audio_num_channels); if (duration_seconds > audio::BUFFER_DURATION_SECONDS) { throw std::invalid_argument("Audio file too long for playback buffer (max " + std::to_string(audio::BUFFER_DURATION_SECONDS) + " seconds); use PlayStream for longer audio"); From b575241d1ebdf0d79559e37739744cf1307cbfa9 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 15:11:07 -0500 Subject: [PATCH 05/13] upgrade sdk --- conanfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conanfile.py b/conanfile.py index 4c6f6e8..bf3a320 100644 --- a/conanfile.py +++ b/conanfile.py @@ -42,7 +42,7 @@ def configure(self): self.options["*"].shared = False def requirements(self): - self.requires("viam-cpp-sdk/0.21.0") + self.requires("viam-cpp-sdk/0.37.0") self.requires("libmp3lame/3.100") self.requires("soxr/0.1.3") From d1e78af0c80418ca7dc74951a21e711d23a2d4eb Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 15:33:19 -0500 Subject: [PATCH 06/13] add get status --- src/discovery.hpp | 1 + src/microphone.hpp | 1 + src/speaker.hpp | 1 + 3 files changed, 3 insertions(+) diff --git a/src/discovery.hpp b/src/discovery.hpp index df3c114..21c9f7b 100644 --- a/src/discovery.hpp +++ b/src/discovery.hpp @@ -16,6 +16,7 @@ class AudioDiscovery : public viam::sdk::Discovery { audio::device_id::DeviceIdResolver* resolver = nullptr); std::vector discover_resources(const viam::sdk::ProtoStruct& extra) override; viam::sdk::ProtoStruct do_command(const viam::sdk::ProtoStruct& command) override; + viam::sdk::ProtoStruct get_status() override { return {}; } static viam::sdk::Model model; private: diff --git a/src/microphone.hpp b/src/microphone.hpp index c679485..601f664 100644 --- a/src/microphone.hpp +++ b/src/microphone.hpp @@ -50,6 +50,7 @@ class Microphone final : public viam::sdk::AudioIn { viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); + viam::sdk::ProtoStruct get_status() override { return {}; } // Restarts the stream. // Must NOT be called while holding stream_ctx_mu_. diff --git a/src/speaker.hpp b/src/speaker.hpp index 1b78eeb..f8351a4 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -62,6 +62,7 @@ class Speaker final : public viam::sdk::AudioOut { viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); + viam::sdk::ProtoStruct get_status() override { return {}; } // Member variables double latency_; From 4d7da0badcff834406859fc0e59d5125a79c5420 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 26 May 2026 15:50:36 -0500 Subject: [PATCH 07/13] lint --- src/discovery.hpp | 4 +++- src/microphone.hpp | 4 +++- src/speaker.hpp | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/discovery.hpp b/src/discovery.hpp index 21c9f7b..2328b4e 100644 --- a/src/discovery.hpp +++ b/src/discovery.hpp @@ -16,7 +16,9 @@ class AudioDiscovery : public viam::sdk::Discovery { audio::device_id::DeviceIdResolver* resolver = nullptr); std::vector discover_resources(const viam::sdk::ProtoStruct& extra) override; viam::sdk::ProtoStruct do_command(const viam::sdk::ProtoStruct& command) override; - viam::sdk::ProtoStruct get_status() override { return {}; } + viam::sdk::ProtoStruct get_status() override { + return {}; + } static viam::sdk::Model model; private: diff --git a/src/microphone.hpp b/src/microphone.hpp index 601f664..470a7e1 100644 --- a/src/microphone.hpp +++ b/src/microphone.hpp @@ -50,7 +50,9 @@ class Microphone final : public viam::sdk::AudioIn { viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); - viam::sdk::ProtoStruct get_status() override { return {}; } + viam::sdk::ProtoStruct get_status() override { + return {}; + } // Restarts the stream. // Must NOT be called while holding stream_ctx_mu_. diff --git a/src/speaker.hpp b/src/speaker.hpp index f8351a4..7558d7e 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -62,7 +62,9 @@ class Speaker final : public viam::sdk::AudioOut { viam::sdk::audio_properties get_properties(const viam::sdk::ProtoStruct& extra); std::vector get_geometries(const viam::sdk::ProtoStruct& extra); - viam::sdk::ProtoStruct get_status() override { return {}; } + viam::sdk::ProtoStruct get_status() override { + return {}; + } // Member variables double latency_; From c46c46ba38c701f26173c86436dc014bf47768e4 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Wed, 27 May 2026 11:51:29 -0500 Subject: [PATCH 08/13] fix test --- src/speaker.cpp | 71 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 11 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 86b9864..f8bcf8d 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -296,6 +296,10 @@ void Speaker::play(std::vector const& audio_data, VIAM_SDK_LOG(debug) << "Play called, adding samples to playback buffer"; + if (audio_data.empty()) { + return; + } + if (!info) { VIAM_SDK_LOG(error) << "[Play]: Must specify audio info parameter"; throw std::invalid_argument("[Play]: Must specify audio info parameter"); @@ -494,19 +498,19 @@ void Speaker::wait_for_playback(std::shared_ptr play std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(drain_latency * 1000))); } -// MP3 is not supported here because per-chunk decoding needs decoder state across chunks. -// Resampling and channel conversion run per-chunk against the init audio_info, so chunk +// Resampling and channel conversion run per-chunk against the source's audio_info, so chunk // boundaries that don't align with the resampler's window can introduce minor artifacts. +// +// MP3 streaming carries a decoder context across chunks (LAME buffers partial frames +// internally) and derives sample_rate/num_channels from the bitstream header rather than the +// init audio_info — the init values for MP3 are ignored. void Speaker::play_stream(viam::sdk::audio_info info, std::function>()> chunk_source, const viam::sdk::ProtoStruct& extra) { std::lock_guard playback_lock(playback_mu_); stop_requested_.store(false); - const AudioCodec codec = audio::codec::parse_codec(info.codec); - if (codec == AudioCodec::MP3) { - throw std::invalid_argument("[PlayStream]: MP3 codec not supported for streaming"); - } + const AudioCodec source_codec = audio::codec::parse_codec(info.codec); int speaker_sample_rate; int speaker_num_channels; @@ -521,6 +525,12 @@ void Speaker::play_stream(viam::sdk::audio_info info, speaker_num_channels = stream_params_.num_channels; } + // MP3 path: decode each chunk through a persistent LAME context. Once the first frame's + // header lands, treat the resulting PCM as PCM_16 for the rest of the pipeline. + MP3DecoderContext mp3_ctx; + const bool is_mp3 = (source_codec == AudioCodec::MP3); + const AudioCodec downstream_codec = is_mp3 ? AudioCodec::PCM_16 : source_codec; + const uint64_t start_position = playback_context->get_write_position(); uint64_t total_samples_written = 0; @@ -531,11 +541,30 @@ void Speaker::play_stream(viam::sdk::audio_info info, if (chunk->empty()) { continue; } - const size_t written = process_and_write_pcm(chunk->data(), - chunk->size(), - codec, - info.sample_rate_hz, - info.num_channels, + + const uint8_t* pcm_data = chunk->data(); + size_t pcm_size = chunk->size(); + std::vector mp3_decoded; + int chunk_sample_rate = info.sample_rate_hz; + int chunk_num_channels = info.num_channels; + + if (is_mp3) { + decode_mp3_chunk(mp3_ctx, chunk->data(), chunk->size(), mp3_decoded); + if (mp3_decoded.empty()) { + // LAME hasn't produced a frame yet (still buffering, or ID3 spans chunks). Skip. + continue; + } + pcm_data = mp3_decoded.data(); + pcm_size = mp3_decoded.size(); + chunk_sample_rate = mp3_ctx.sample_rate; + chunk_num_channels = mp3_ctx.num_channels; + } + + const size_t written = process_and_write_pcm(pcm_data, + pcm_size, + downstream_codec, + chunk_sample_rate, + chunk_num_channels, speaker_sample_rate, speaker_num_channels, playback_context); @@ -546,6 +575,26 @@ void Speaker::play_stream(viam::sdk::audio_info info, total_samples_written += written; } + // Drain any frames LAME still has buffered, then push them through the pipeline. + if (is_mp3 && !stop_requested_.load()) { + std::vector tail; + flush_mp3_decoder(mp3_ctx, tail); + if (!tail.empty()) { + const size_t written = process_and_write_pcm(tail.data(), + tail.size(), + AudioCodec::PCM_16, + mp3_ctx.sample_rate, + mp3_ctx.num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); + if (written == 0) { + return; + } + total_samples_written += written; + } + } + wait_for_playback(playback_context, start_position, total_samples_written); } From 8f92d5e1de15cd0a483c4cb276a31d5e2e8ec763 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Wed, 27 May 2026 12:35:45 -0500 Subject: [PATCH 09/13] fix mp3 --- src/speaker.cpp | 61 +++++++------------------------------------------ 1 file changed, 8 insertions(+), 53 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index f8bcf8d..816eb32 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -500,10 +500,6 @@ void Speaker::wait_for_playback(std::shared_ptr play // Resampling and channel conversion run per-chunk against the source's audio_info, so chunk // boundaries that don't align with the resampler's window can introduce minor artifacts. -// -// MP3 streaming carries a decoder context across chunks (LAME buffers partial frames -// internally) and derives sample_rate/num_channels from the bitstream header rather than the -// init audio_info — the init values for MP3 are ignored. void Speaker::play_stream(viam::sdk::audio_info info, std::function>()> chunk_source, const viam::sdk::ProtoStruct& extra) { @@ -511,6 +507,9 @@ void Speaker::play_stream(viam::sdk::audio_info info, stop_requested_.store(false); const AudioCodec source_codec = audio::codec::parse_codec(info.codec); + if (source_codec == AudioCodec::MP3) { + throw std::invalid_argument("[PlayStream] MP3 streaming is not supported; use play() for MP3"); + } int speaker_sample_rate; int speaker_num_channels; @@ -525,12 +524,6 @@ void Speaker::play_stream(viam::sdk::audio_info info, speaker_num_channels = stream_params_.num_channels; } - // MP3 path: decode each chunk through a persistent LAME context. Once the first frame's - // header lands, treat the resulting PCM as PCM_16 for the rest of the pipeline. - MP3DecoderContext mp3_ctx; - const bool is_mp3 = (source_codec == AudioCodec::MP3); - const AudioCodec downstream_codec = is_mp3 ? AudioCodec::PCM_16 : source_codec; - const uint64_t start_position = playback_context->get_write_position(); uint64_t total_samples_written = 0; @@ -542,29 +535,11 @@ void Speaker::play_stream(viam::sdk::audio_info info, continue; } - const uint8_t* pcm_data = chunk->data(); - size_t pcm_size = chunk->size(); - std::vector mp3_decoded; - int chunk_sample_rate = info.sample_rate_hz; - int chunk_num_channels = info.num_channels; - - if (is_mp3) { - decode_mp3_chunk(mp3_ctx, chunk->data(), chunk->size(), mp3_decoded); - if (mp3_decoded.empty()) { - // LAME hasn't produced a frame yet (still buffering, or ID3 spans chunks). Skip. - continue; - } - pcm_data = mp3_decoded.data(); - pcm_size = mp3_decoded.size(); - chunk_sample_rate = mp3_ctx.sample_rate; - chunk_num_channels = mp3_ctx.num_channels; - } - - const size_t written = process_and_write_pcm(pcm_data, - pcm_size, - downstream_codec, - chunk_sample_rate, - chunk_num_channels, + const size_t written = process_and_write_pcm(chunk->data(), + chunk->size(), + source_codec, + info.sample_rate_hz, + info.num_channels, speaker_sample_rate, speaker_num_channels, playback_context); @@ -575,26 +550,6 @@ void Speaker::play_stream(viam::sdk::audio_info info, total_samples_written += written; } - // Drain any frames LAME still has buffered, then push them through the pipeline. - if (is_mp3 && !stop_requested_.load()) { - std::vector tail; - flush_mp3_decoder(mp3_ctx, tail); - if (!tail.empty()) { - const size_t written = process_and_write_pcm(tail.data(), - tail.size(), - AudioCodec::PCM_16, - mp3_ctx.sample_rate, - mp3_ctx.num_channels, - speaker_sample_rate, - speaker_num_channels, - playback_context); - if (written == 0) { - return; - } - total_samples_written += written; - } - } - wait_for_playback(playback_context, start_position, total_samples_written); } From 9b972b23a307dfab8938f9a32ddd21323db0e677 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Mon, 1 Jun 2026 11:36:30 -0400 Subject: [PATCH 10/13] wait if buffer filling too quickly --- src/speaker.cpp | 32 +++++++++++++++++++++++++------- src/speaker.hpp | 6 ++++-- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 816eb32..c076a56 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -19,6 +19,12 @@ using audio::codec::AudioCodec; constexpr int MIN_VOLUME = 0; constexpr int MAX_VOLUME = 100; +// Distance from the buffer's lap point at which process_and_write_pcm blocks waiting for +// the callback to drain. This is what paces a faster-than-real-time producer (TTS, file +// playback) — and the size of the margin is the cushion against any delay in the callback +// advancing playback_position (scheduler jitter, USB stalls, etc.). +constexpr int BUFFER_MARGIN_MS = 50; + Speaker::Speaker(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, audio::portaudio::PortAudioInterface* pa) : viam::sdk::AudioOut(cfg.name()), pa_(pa), stream_(nullptr) { auto setup = audio::utils::setup_audio_device( @@ -436,14 +442,26 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, throw std::invalid_argument("process_and_write_pcm: input too small to produce any output samples after resample"); } - { - std::lock_guard lock(stream_mu_); - if (audio_context_ != playback_context) { - return 0; - } - for (size_t i = 0; i < num_samples; i++) { - audio_context_->write_sample(samples[i]); + // Backpressure: cap how far the producer can run ahead of the callback so a faster-than- + // real-time source can't lap the read pointer and erase audio. + const uint64_t margin_samples = + static_cast(speaker_sample_rate) * speaker_num_channels * BUFFER_MARGIN_MS / 1000; + const uint64_t max_ahead = static_cast(playback_context->buffer_capacity) - margin_samples; + + for (size_t i = 0; i < num_samples; ++i) { + while (playback_context->get_write_position() - playback_context->playback_position.load() >= max_ahead) { + if (stop_requested_.load()) { + return i; + } + { + std::lock_guard lock(stream_mu_); + if (audio_context_ != playback_context) { + return 0; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + playback_context->write_sample(samples[i]); } return num_samples; } diff --git a/src/speaker.hpp b/src/speaker.hpp index 7558d7e..2080d12 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -109,8 +109,10 @@ class Speaker final : public viam::sdk::AudioOut { void restart_stalled_stream(const std::shared_ptr& playback_context); // Decode (PCM_16/32/32_FLOAT), channel-convert, resample, and write into the playback - // context's buffer. Returns samples written, or 0 if the stream context was swapped - // mid-call (the only path that produces a 0 return). Caller must hold playback_mu_. + // context's buffer. Writes are paced so the producer can't run more than buffer_capacity + // samples ahead of the callback; this propagates backpressure up through chunk_source. + // Returns num_samples on success, a partial count if stop_requested_ was set mid-write, + // or 0 if the stream context was swapped mid-call. Caller must hold playback_mu_. size_t process_and_write_pcm(const uint8_t* data, size_t size, audio::codec::AudioCodec codec, From 6ab898ac3e742da20a6e78557c74d0381ef4fdb1 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Mon, 1 Jun 2026 11:41:05 -0400 Subject: [PATCH 11/13] lint --- src/speaker.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index c076a56..9e65fc0 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -444,8 +444,7 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, // Backpressure: cap how far the producer can run ahead of the callback so a faster-than- // real-time source can't lap the read pointer and erase audio. - const uint64_t margin_samples = - static_cast(speaker_sample_rate) * speaker_num_channels * BUFFER_MARGIN_MS / 1000; + const uint64_t margin_samples = static_cast(speaker_sample_rate) * speaker_num_channels * BUFFER_MARGIN_MS / 1000; const uint64_t max_ahead = static_cast(playback_context->buffer_capacity) - margin_samples; for (size_t i = 0; i < num_samples; ++i) { From 912b163c9f35d278b9caf8c645e8bb4a3f4683c6 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 2 Jun 2026 15:01:20 -0400 Subject: [PATCH 12/13] refactor --- src/speaker.cpp | 34 +++++++--------- src/speaker.hpp | 5 ++- test/speaker_test.cpp | 93 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 21 deletions(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 9e65fc0..394a3ac 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -387,8 +387,6 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, int speaker_sample_rate, int speaker_num_channels, std::shared_ptr playback_context) { - // A 0 return is reserved to mean "the stream context was swapped out"; reject empty - // input up front so callers can rely on that contract. if (size == 0) { throw std::invalid_argument("process_and_write_pcm: empty input"); } @@ -435,9 +433,6 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, samples = resampled.data(); num_samples = resampled.size(); } - - // Guarantees the post-decode pipeline preserves the "non-empty in → non-empty out" invariant, - // so the 0 return below can only mean a context swap. if (num_samples == 0) { throw std::invalid_argument("process_and_write_pcm: input too small to produce any output samples after resample"); } @@ -455,7 +450,7 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, { std::lock_guard lock(stream_mu_); if (audio_context_ != playback_context) { - return 0; + return i; } } std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -548,23 +543,24 @@ void Speaker::play_stream(viam::sdk::audio_info info, if (stop_requested_.load()) { break; } + { + std::lock_guard lock(stream_mu_); + if (audio_context_ != playback_context) { + return; + } + } if (chunk->empty()) { continue; } - const size_t written = process_and_write_pcm(chunk->data(), - chunk->size(), - source_codec, - info.sample_rate_hz, - info.num_channels, - speaker_sample_rate, - speaker_num_channels, - playback_context); - if (written == 0) { - // Stream context was swapped; bail without draining. - return; - } - total_samples_written += written; + total_samples_written += process_and_write_pcm(chunk->data(), + chunk->size(), + source_codec, + info.sample_rate_hz, + info.num_channels, + speaker_sample_rate, + speaker_num_channels, + playback_context); } wait_for_playback(playback_context, start_position, total_samples_written); diff --git a/src/speaker.hpp b/src/speaker.hpp index 2080d12..6cbfbb1 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -111,8 +111,9 @@ class Speaker final : public viam::sdk::AudioOut { // Decode (PCM_16/32/32_FLOAT), channel-convert, resample, and write into the playback // context's buffer. Writes are paced so the producer can't run more than buffer_capacity // samples ahead of the callback; this propagates backpressure up through chunk_source. - // Returns num_samples on success, a partial count if stop_requested_ was set mid-write, - // or 0 if the stream context was swapped mid-call. Caller must hold playback_mu_. + // Returns the number of samples actually written — equal to the decoded input size on a + // full write, or a partial count if stop_requested_ fired or the stream context was + // swapped mid-write. Caller must hold playback_mu_. size_t process_and_write_pcm(const uint8_t* data, size_t size, audio::codec::AudioCodec codec, diff --git a/test/speaker_test.cpp b/test/speaker_test.cpp index 48b8e59..f85b234 100644 --- a/test/speaker_test.cpp +++ b/test/speaker_test.cpp @@ -1114,6 +1114,99 @@ TEST_F(SpeakerTest, PlayStream_PCM16_MultipleChunks) { } } +TEST_F(SpeakerTest, PlayStream_BackpressureBlocksProducerWhenBufferFull) { + // Small sample_rate keeps buffer_capacity tractable and the test fast. + // buffer_capacity = sample_rate * num_channels * BUFFER_DURATION_SECONDS = 30000. + // BUFFER_MARGIN_MS = 50 → margin_samples = 50, max_ahead = 29950. + const int sample_rate = 1000; + const int num_channels = 1; + const int buffer_margin_ms = 50; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + const uint64_t buffer_capacity = static_cast(speaker.audio_context_->buffer_capacity); + const uint64_t margin_samples = + static_cast(sample_rate) * num_channels * buffer_margin_ms / 1000; + const uint64_t max_ahead = buffer_capacity - margin_samples; + + // Total samples exceed buffer_capacity, forcing the producer to block at least once + // while waiting for the consumer to advance playback_position. + const size_t samples_per_chunk = 5000; + const size_t num_chunks = 10; + const uint64_t total_samples = static_cast(samples_per_chunk * num_chunks); + ASSERT_GT(total_samples, buffer_capacity); + + std::atomic chunks_consumed{0}; + auto chunk_source = [&]() -> boost::optional> { + const size_t idx = chunks_consumed.load(); + if (idx >= num_chunks) { + return boost::none; + } + chunks_consumed.fetch_add(1); + return std::vector(samples_per_chunk * sizeof(int16_t), 0); + }; + + viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, sample_rate, num_channels}; + ProtoStruct extra{}; + + std::thread producer([&]() { + speaker.play_stream(info, chunk_source, extra); + }); + + auto wait_until = [](const std::function& predicate, std::chrono::milliseconds timeout) { + const auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) { + if (predicate()) return true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return predicate(); + }; + + // Phase 1: producer fills the buffer up to max_ahead while playback_position is 0. + const bool reached_cap = wait_until( + [&]() { return speaker.audio_context_->get_write_position() >= max_ahead; }, + std::chrono::seconds(1)); + ASSERT_TRUE(reached_cap) << "producer never reached max_ahead"; + + // Core invariant: write must never exceed read + max_ahead. + EXPECT_LE(speaker.audio_context_->get_write_position(), max_ahead); + + // With buffer full, producer must not have consumed all chunks. + EXPECT_LT(chunks_consumed.load(), num_chunks); + + // Confirm producer is genuinely blocked — write_pos stays put across a sleep. + const uint64_t write_pos_snapshot = speaker.audio_context_->get_write_position(); + const size_t chunks_consumed_snapshot = chunks_consumed.load(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(speaker.audio_context_->get_write_position(), write_pos_snapshot) + << "producer kept writing while buffer was full"; + EXPECT_EQ(chunks_consumed.load(), chunks_consumed_snapshot) + << "producer pulled new chunks while blocked"; + + // Phase 2: advance playback_position; producer must resume. + speaker.audio_context_->playback_position.store(max_ahead); + const bool resumed = wait_until( + [&]() { return chunks_consumed.load() > chunks_consumed_snapshot; }, + std::chrono::seconds(1)); + EXPECT_TRUE(resumed) << "producer did not resume after playback_position advanced"; + + // Drain everything so wait_for_playback exits and play_stream returns cleanly. + speaker.audio_context_->playback_position.store(total_samples); + producer.join(); + + EXPECT_EQ(chunks_consumed.load(), num_chunks); + EXPECT_EQ(speaker.audio_context_->get_write_position(), total_samples); +} + TEST_F(SpeakerTest, PlayStream_PCM32_DecodesChunk) { const int sample_rate = 48000; const int num_channels = 1; From 3a17ff6be69ef749283f4762acecb854d29536fa Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 2 Jun 2026 15:44:46 -0400 Subject: [PATCH 13/13] fix --- src/speaker.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/speaker.cpp b/src/speaker.cpp index 394a3ac..9021d3e 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -443,7 +443,10 @@ size_t Speaker::process_and_write_pcm(const uint8_t* data, const uint64_t max_ahead = static_cast(playback_context->buffer_capacity) - margin_samples; for (size_t i = 0; i < num_samples; ++i) { - while (playback_context->get_write_position() - playback_context->playback_position.load() >= max_ahead) { + // Use `write >= read + max_ahead` rather than `write - read >= max_ahead` so a read + // position ahead of write (only reachable in tests that pre-advance playback_position) + // doesn't underflow into a huge unsigned diff and trap us forever. + while (playback_context->get_write_position() >= playback_context->playback_position.load() + max_ahead) { if (stop_requested_.load()) { return i; }