Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions crates/nodes/src/containers/ogg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,3 +875,180 @@ pub fn register_ogg_nodes(registry: &mut NodeRegistry) {
);
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Test modules add lint suppressions without required rationale

The repository’s AGENTS.md explicitly requires lint suppressions to include a rationale comment, but the newly added test modules introduce #[allow(clippy::unwrap_used, clippy::expect_used)] without explaining why the suppression is necessary. The same pattern is also present in the added test modules at crates/nodes/src/video/pixel_ops/blit.rs:1310 and crates/nodes/src/video/pixel_ops/convert.rs:705; in the blit/convert tests the suppression appears unnecessary because those modules do not use unwrap or expect. This weakens the lint discipline that the repo mandates for PRs.

Prompt for agents
Audit the new test-module lint suppressions in crates/nodes/src/containers/ogg.rs, crates/nodes/src/video/pixel_ops/blit.rs, and crates/nodes/src/video/pixel_ops/convert.rs. Remove suppressions that are not needed, and for any suppression that remains, add a short rationale comment explaining why the test intentionally uses unwrap/expect rather than refactoring the assertions. This is required by the AGENTS.md linting discipline rule.
Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the established pattern throughout the crate — all existing test modules (nv_av1, openh264, vaapi_av1, vaapi_h264, vp9, vulkan_video, pixel_convert, colorbars, moq/push) use the same #[allow(clippy::unwrap_used, clippy::expect_used)] without inline rationale. The suppression is necessary because test code intentionally uses unwrap() to fail-fast on unexpected errors rather than propagating Result.

mod tests {
use super::*;

#[test]
fn ogg_muxer_config_defaults() {
let config = OggMuxerConfig::default();
assert_eq!(config.stream_serial, 0);
assert!(matches!(config.codec, OggMuxerCodec::Opus));
assert_eq!(config.channels, 1);
assert_eq!(config.chunk_size, 65536);
}

#[test]
fn ogg_muxer_config_deserialization() {
let json = r#"{"stream_serial": 42, "channels": 2, "chunk_size": 4096}"#;
let config: OggMuxerConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.stream_serial, 42);
assert_eq!(config.channels, 2);
assert_eq!(config.chunk_size, 4096);
}

#[test]
fn ogg_demuxer_config_defaults() {
let config: OggDemuxerConfig = serde_json::from_str("{}").unwrap();
let _ = config;
}

#[test]
fn ogg_muxer_content_type() {
let node = OggMuxerNode::new(OggMuxerConfig::default());
assert_eq!(node.content_type(), Some("audio/ogg".to_string()));
}

#[test]
fn ogg_muxer_pins() {
let node = OggMuxerNode::new(OggMuxerConfig::default());
let inputs = node.input_pins();
assert_eq!(inputs.len(), 1);
assert_eq!(inputs[0].name, "in");

let outputs = node.output_pins();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].name, "out");
assert!(matches!(outputs[0].produces_type, PacketType::Binary));
}

#[test]
fn ogg_demuxer_pins() {
let node = OggDemuxerNode::new(OggDemuxerConfig::default());
let inputs = node.input_pins();
assert_eq!(inputs.len(), 1);
assert_eq!(inputs[0].name, "in");

let outputs = node.output_pins();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].name, "out");
assert!(matches!(
outputs[0].produces_type,
PacketType::EncodedAudio(EncodedAudioFormat { codec: AudioCodec::Opus, .. })
));
}

#[tokio::test]
async fn ogg_mux_produces_valid_ogg_output() {
use crate::test_utils::{create_test_binary_packet, create_test_context};

let (input_tx, input_rx) = tokio::sync::mpsc::channel(10);
let mut inputs = std::collections::HashMap::new();
inputs.insert("in".to_string(), input_rx);

let (context, mock_sender, _state_rx) = create_test_context(inputs, 1);

let config = OggMuxerConfig { chunk_size: 128, ..OggMuxerConfig::default() };
let node = Box::new(OggMuxerNode::new(config));

let handle = tokio::spawn(async move { node.run(context).await });

for _ in 0..5 {
input_tx.send(create_test_binary_packet(vec![0xAB; 160])).await.unwrap();
}
drop(input_tx);

handle.await.unwrap().unwrap();

let packets = mock_sender.collect_packets().await;
assert!(!packets.is_empty(), "muxer should produce output");

let mut ogg_data = Vec::new();
for (_, _, pkt) in &packets {
if let Packet::Binary { data, content_type, .. } = pkt {
assert_eq!(content_type.as_deref(), Some("audio/ogg"));
ogg_data.extend_from_slice(data);
}
}

assert!(ogg_data.len() >= 4);
assert_eq!(&ogg_data[..4], b"OggS");
}

#[tokio::test]
async fn ogg_mux_demux_round_trip() {
use crate::test_utils::{create_test_binary_packet, create_test_context};

let (input_tx, input_rx) = tokio::sync::mpsc::channel(10);
let mut inputs = std::collections::HashMap::new();
inputs.insert("in".to_string(), input_rx);

let (context, mock_sender, _state_rx) = create_test_context(inputs, 1);

let config = OggMuxerConfig { chunk_size: 128, ..OggMuxerConfig::default() };
let node = Box::new(OggMuxerNode::new(config));

let handle = tokio::spawn(async move { node.run(context).await });

let original_payloads: Vec<Vec<u8>> = (0..3).map(|i| vec![0x10 + i; 160]).collect();

for payload in &original_payloads {
input_tx.send(create_test_binary_packet(payload.clone())).await.unwrap();
}
drop(input_tx);

handle.await.unwrap().unwrap();

let muxed_packets = mock_sender.collect_packets().await;
let mut ogg_data = Vec::new();
for (_, _, pkt) in &muxed_packets {
if let Packet::Binary { data, .. } = pkt {
ogg_data.extend_from_slice(data);
}
}
assert!(!ogg_data.is_empty());

let (demux_input_tx, demux_input_rx) = tokio::sync::mpsc::channel(10);
let mut demux_inputs = std::collections::HashMap::new();
demux_inputs.insert("in".to_string(), demux_input_rx);

let (demux_context, demux_sender, _demux_state_rx) = create_test_context(demux_inputs, 1);

let demux_node = Box::new(OggDemuxerNode::new(OggDemuxerConfig::default()));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 New Ogg round-trip tests bypass the symphonia demuxer selected by default registration

The new round-trip test directly instantiates OggDemuxerNode, but register_ogg_nodes registers SymphoniaOggDemuxerNode whenever the symphonia feature is enabled, which is part of the default feature set in crates/nodes/Cargo.toml:117-137 and selected at crates/nodes/src/containers/ogg.rs:837-855. This is not a runtime bug in the test itself, but it means the new coverage does not exercise the demuxer implementation most default builds actually expose under containers::ogg::demuxer. If the intent is regression coverage for the registered node, add a feature-aware test path or instantiate the symphonia demuxer when that feature is active.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. The test intentionally uses OggDemuxerNode (the raw ogg crate demuxer) rather than SymphoniaOggDemuxerNode because both share the same external contract (Binary in → EncodedAudio Opus out) and this exercises the simpler code path. Adding a feature-gated test path for the symphonia demuxer is a reasonable follow-up but out of scope for this coverage sprint.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #508.


let demux_handle = tokio::spawn(async move { demux_node.run(demux_context).await });

demux_input_tx.send(create_test_binary_packet(ogg_data)).await.unwrap();
drop(demux_input_tx);

demux_handle.await.unwrap().unwrap();

let demuxed = demux_sender.collect_packets().await;
let data_packets: Vec<_> = demuxed
.iter()
.filter_map(|(_, _, pkt)| {
if let Packet::Binary { data, .. } = pkt {
if !data.is_empty()
&& !data.starts_with(b"OpusHead")
&& !data.starts_with(b"OpusTags")
{
return Some(data.clone());
}
}
None
})
.collect();

assert_eq!(
data_packets.len(),
original_payloads.len(),
"demuxer should output one data packet per muxed payload"
);

for (i, data) in data_packets.iter().enumerate() {
assert_eq!(data.as_ref(), &original_payloads[i], "round-trip payload {i} should match");
}
}
}
67 changes: 67 additions & 0 deletions crates/nodes/src/transport/moq/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,4 +930,71 @@ mod tests {
let vp9: MoqPushConfig = serde_json::from_str(r#"{"video_codec": "vp9"}"#).unwrap();
assert_eq!(vp9.video_codec, Some(VideoCodec::Vp9));
}

#[test]
fn moq_push_config_all_defaults() {
let config: super::MoqPushConfig = serde_json::from_str("{}").unwrap();
assert!(config.url.is_empty());
assert!(config.jwt.is_none());
assert!(config.broadcast.is_empty());
assert_eq!(config.channels, 2);
assert!(config.audio.is_none());
assert!(config.video.is_none());
assert!(config.video_codec.is_none());
assert!(config.audio_codec.is_none());
assert_eq!(config.group_duration_ms, 40);
assert_eq!(config.initial_delay_ms, 0);
}

#[test]
fn is_video_pin_empty_string() {
assert!(!is_video_pin(""));
}

#[test]
fn is_video_pin_slashes_only() {
assert!(!is_video_pin("/"));
assert!(!is_video_pin("///"));
}

#[test]
fn is_video_pin_prefix_is_case_sensitive() {
assert!(!is_video_pin("Video/hd"));
assert!(!is_video_pin("VIDEO/hd"));
}

#[test]
fn track_name_from_pin_empty_string() {
assert_eq!(track_name_from_pin(""), "audio/");
}

#[test]
fn track_name_from_pin_slashes_only() {
assert_eq!(track_name_from_pin("/"), "audio//");
assert_eq!(track_name_from_pin("video/"), "video/");
assert_eq!(track_name_from_pin("audio/"), "audio/");
Comment on lines +966 to +975
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 MoQ tests document permissive empty/slash track names

The added assertions lock in outputs like track_name_from_pin("") == "audio/" and track_name_from_pin("/") == "audio//". That matches the current helper and dynamic pin code path (crates/nodes/src/transport/moq/push.rs:633-644, crates/nodes/src/transport/moq/push.rs:764-768), so I did not flag it as a bug from this PR, but it is a noteworthy API-contract choice: if empty or slash-only pin names are invalid in the graph layer, these tests are harmless; if dynamic pin requests can pass them through, the MoQ catalog can publish odd track names rather than rejecting or normalizing them.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — these tests intentionally document the current behavior of the helpers with edge-case inputs. If the graph layer validates pin names before they reach track_name_from_pin, these are harmless. If not, that's a pre-existing concern outside this PR's scope.

}

#[test]
fn moq_push_config_full_deserialization() {
let json = r#"{
"url": "https://relay.example.com",
"jwt": "tok123",
"broadcast": "my-stream",
"channels": 1,
"audio": true,
"video": false,
"group_duration_ms": 100,
"initial_delay_ms": 50
}"#;
let config: super::MoqPushConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.url, "https://relay.example.com");
assert_eq!(config.jwt.as_deref(), Some("tok123"));
assert_eq!(config.broadcast, "my-stream");
assert_eq!(config.channels, 1);
assert_eq!(config.audio, Some(true));
assert_eq!(config.video, Some(false));
assert_eq!(config.group_duration_ms, 100);
assert_eq!(config.initial_delay_ms, 50);
}
}
107 changes: 107 additions & 0 deletions crates/nodes/src/video/colorbars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,5 +810,112 @@ mod tests {
assert_eq!(config.height, 480);
assert_eq!(config.fps, 30);
assert_eq!(config.frame_count, 0);
assert_eq!(config.pixel_format, "nv12");
assert!(!config.draw_time);
assert!(!config.animate);
}

#[test]
fn test_colorbars_config_custom_deserialization() {
let json = r#"{
"width": 1280,
"height": 720,
"fps": 60,
"frame_count": 10,
"pixel_format": "rgba8"
}"#;
let config: ColorBarsConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.width, 1280);
assert_eq!(config.height, 720);
assert_eq!(config.fps, 60);
assert_eq!(config.frame_count, 10);
assert_eq!(config.pixel_format, "rgba8");
}

#[test]
fn test_smpte_colorbars_nv12() {
let width = 640u32;
let height = 480u32;
let layout = streamkit_core::types::VideoLayout::packed(width, height, PixelFormat::Nv12);
let total = layout.total_bytes();
let mut data = vec![0u8; total];
generate_smpte_colorbars_nv12(width, height, &mut data, &layout);

// Y plane: first pixel should be white (Y=180).
assert_eq!(data[0], 180);
// Last bar (rightmost column) should be blue (Y=35).
let last_y_col = (width - 1) as usize;
assert_eq!(data[last_y_col], 35);

// UV plane should be non-zero (chroma data present).
let planes = layout.planes();
let uv_plane = planes[1];
let uv_start = uv_plane.offset;
let uv_len = uv_plane.stride * uv_plane.height as usize;
let uv_data = &data[uv_start..uv_start + uv_len];
assert!(uv_data.iter().any(|&b| b != 0), "UV plane should contain chroma data");
}

#[test]
fn test_smpte_colorbars_rgba8() {
let width = 640u32;
let height = 480u32;
let total = (width * height * 4) as usize;
let mut data = vec![0u8; total];
generate_smpte_colorbars_rgba8(width, height, &mut data);

// First pixel should be 75% white (191, 191, 191, 255).
assert_eq!(&data[0..4], &[191, 191, 191, 255]);
// Last column should be blue (0, 0, 191, 255).
let last_px = ((width - 1) as usize) * 4;
assert_eq!(&data[last_px..last_px + 4], &[0, 0, 191, 255]);

// All alpha values should be 255.
for px in data.chunks_exact(4) {
assert_eq!(px[3], 255, "alpha should always be 255");
}
}

#[tokio::test]
async fn test_colorbars_frame_count_limit() {
use crate::test_utils::create_oneshot_test_context;

let inputs = std::collections::HashMap::new();
let (mut context, mock_sender, mut state_rx) = create_oneshot_test_context(inputs, 1);

let (control_tx, control_rx) = tokio::sync::mpsc::channel(10);
context.control_rx = control_rx;

let config = ColorBarsConfig {
width: 32,
height: 32,
fps: 30,
frame_count: 5,
pixel_format: "i420".to_string(),
..ColorBarsConfig::default()
};
let pixel_format = parse_pixel_format(&config.pixel_format).unwrap();
let node = Box::new(ColorBarsNode { config, pixel_format });

let handle = tokio::spawn(async move { node.run(context).await });

// Drain Initializing + Ready states, then send Start.
crate::test_utils::assert_state_initializing(&mut state_rx).await;
crate::test_utils::assert_state_update(
&mut state_rx,
|s| matches!(s, streamkit_core::NodeState::Ready),
"Ready",
)
.await;
control_tx.send(streamkit_core::control::NodeControlMessage::Start).await.unwrap();
Comment on lines +884 to +910
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Colorbars frame-count test uses a manually replaced control channel

The new async colorbars test overwrites context.control_rx with its own channel before spawning the node, then waits for Ready and sends Start. This matches ColorBarsNode::run, which waits on context.control_rx.recv() after emitting Ready (crates/nodes/src/video/colorbars.rs:217-236), so it is not a bug. The important implication is that this test depends on directly mutating NodeContext; if the test utilities later expose the control sender, this pattern could be simplified and made less tightly coupled to the context fields.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — the control_rx replacement is the only way to drive the Start/Stop lifecycle in tests currently. If create_oneshot_test_context later exposes a control sender, this can be simplified.


handle.await.unwrap().unwrap();

let packets = mock_sender.collect_packets().await;
assert_eq!(packets.len(), 5, "should produce exactly 5 frames");

for (_, _, pkt) in &packets {
assert!(matches!(pkt, streamkit_core::types::Packet::Video(_)));
}
}
}
Loading
Loading