From 09e0f875c2e36a8eb01b5923aed62c32926cb45d Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sun, 24 May 2026 18:44:59 +0000 Subject: [PATCH 1/3] test(nodes): fill RTMP protocol test coverage gaps Add unit tests to increase coverage of the RTMP implementation: rtmp_client.rs: - send_audio() happy path (Publishing state) - send_audio() before Publishing (error path) - handle_error() transitions to Disconnecting - handle_on_status() with non-Publish.Start code - maybe_send_ack() when window exceeded - amf0_decode for unsupported EcmaArray marker - ChunkDecoder 2-byte and 3-byte basic header forms rtmp.rs: - stamp() with None timestamp (fallback behaviour) - Config defaults (sample_rate=48000, channels=2) - convert_annexb_to_avcc with SEI/AUD NAL types Signed-off-by: Devin AI Signed-off-by: streamkit-devin --- crates/nodes/src/transport/rtmp.rs | 65 ++++ crates/nodes/src/transport/rtmp_client.rs | 345 ++++++++++++++++++++++ 2 files changed, 410 insertions(+) diff --git a/crates/nodes/src/transport/rtmp.rs b/crates/nodes/src/transport/rtmp.rs index 97b21b13a..9a144aa45 100644 --- a/crates/nodes/src/transport/rtmp.rs +++ b/crates/nodes/src/transport/rtmp.rs @@ -1494,4 +1494,69 @@ mod tests { "video_data should be empty for SPS/PPS-only access units" ); } + + #[test] + fn stamp_with_no_metadata_defaults_to_zero() { + let mut state = RtmpTimestampState::new(); + let pkt = Packet::Binary { + data: bytes::Bytes::from_static(&[0]), + metadata: None, + content_type: None, + }; + let ts = state.stamp(&pkt, Track::Video, "test"); + assert_eq!(ts, 0); + } + + #[test] + fn config_default_sample_rate() { + assert_eq!(default_sample_rate(), 48_000); + } + + #[test] + fn config_default_channels() { + assert_eq!(default_channels(), 2); + } + + #[test] + fn convert_annexb_sei_and_aud_in_video_data() { + let mut annexb = Vec::new(); + // SEI NAL (type 6) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let sei = [0x06, 0x05, 0x04, 0x03]; + annexb.extend_from_slice(&sei); + // AUD NAL (type 9) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let aud = [0x09, 0x10]; + annexb.extend_from_slice(&aud); + // IDR slice (type 5) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let idr = [0x65, 0x88, 0x84]; + annexb.extend_from_slice(&idr); + + let result = convert_annexb_to_avcc(&annexb); + + assert!(result.sps_list.is_empty()); + assert!(result.pps_list.is_empty()); + + // All three NALs (SEI, AUD, IDR) should appear in video_data. + let avcc = &result.video_data; + let mut nal_types = Vec::new(); + let mut offset = 0; + while offset + 4 <= avcc.len() { + let len = u32::from_be_bytes([ + avcc[offset], + avcc[offset + 1], + avcc[offset + 2], + avcc[offset + 3], + ]) as usize; + offset += 4; + assert!(offset + len <= avcc.len(), "AVCC data truncated"); + nal_types.push(avcc[offset] & H264_NAL_TYPE_MASK); + offset += len; + } + + assert!(nal_types.contains(&6), "SEI should be in video_data"); + assert!(nal_types.contains(&9), "AUD should be in video_data"); + assert!(nal_types.contains(&5), "IDR should be in video_data"); + } } diff --git a/crates/nodes/src/transport/rtmp_client.rs b/crates/nodes/src/transport/rtmp_client.rs index 1811b6279..ecdc7874f 100644 --- a/crates/nodes/src/transport/rtmp_client.rs +++ b/crates/nodes/src/transport/rtmp_client.rs @@ -2227,6 +2227,102 @@ mod tests { assert_eq!(val, 320); } + /// Drive a connection through the full handshake → connect → createStream + /// → publish flow, returning it in `Publishing` state. Mirrors the + /// `full_youtube_server_simulation` test but returns the connection and + /// server encoder for further use. + fn drive_to_publishing() -> (RtmpPublishClientConnection, ChunkEncoder) { + let url = RtmpUrl::parse("rtmp://x.rtmp.youtube.com/live2/stream-key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xBB; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + let mut srv_enc = ChunkEncoder::new(); + + let win_ack = server_encode( + &mut srv_enc, + 2, + MSG_WIN_ACK_SIZE, + 0, + 2_500_000u32.to_be_bytes().to_vec(), + ); + let mut set_bw_payload = 59_768_832u32.to_be_bytes().to_vec(); + set_bw_payload.push(2); + let set_bw = server_encode(&mut srv_enc, 2, MSG_SET_PEER_BANDWIDTH, 0, set_bw_payload); + let mut server_msg = Vec::new(); + server_msg.extend_from_slice(&win_ack); + server_msg.extend_from_slice(&set_bw); + conn.feed_recv_buf(&server_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // connect _result + let mut result_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut result_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("fmsVer".to_string(), Amf0Value::String("FMS/3,5,7,7009".to_string())), + ("capabilities".to_string(), Amf0Value::Number(31.0)), + ]), + &mut result_payload, + ) + .unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Success".to_string()), + ), + ("description".to_string(), Amf0Value::String("Connection succeeded".to_string())), + ]), + &mut result_payload, + ) + .unwrap(); + let result_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, result_payload); + conn.feed_recv_buf(&result_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // createStream _result + let mut cs_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(2.0), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut cs_payload).unwrap(); + let cs_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, cs_payload); + conn.feed_recv_buf(&cs_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // onStatus → NetStream.Publish.Start + let mut status_payload = Vec::new(); + amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut status_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ("code".to_string(), Amf0Value::String("NetStream.Publish.Start".to_string())), + ("description".to_string(), Amf0Value::String("Publishing".to_string())), + ]), + &mut status_payload, + ) + .unwrap(); + let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); + conn.feed_recv_buf(&status_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + assert_eq!(conn.state(), RtmpConnectionState::Publishing); + (conn, srv_enc) + } + /// Build an RTMP chunk from scratch using our encoder, simulating /// a server sending a message. Returns the raw bytes ready to feed into /// a client connection's `feed_recv_buf`. @@ -2366,4 +2462,253 @@ mod tests { conn.send_video(&video).unwrap(); assert!(!conn.send_buf().is_empty()); } + + #[test] + fn connection_send_audio_happy_path() { + let (mut conn, _srv_enc) = drive_to_publishing(); + + let frame = AudioFrame { + timestamp: RtmpTimestamp::from_millis(0), + format: AudioFormat::Aac, + sample_rate: AudioSampleRate::Khz44, + is_8bit_sample: false, + is_stereo: true, + is_aac_sequence_header: true, + data: vec![0x11, 0x90], + }; + conn.send_audio(&frame).unwrap(); + assert!(!conn.send_buf().is_empty()); + } + + #[test] + fn connection_send_audio_before_publishing_errors() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + let frame = AudioFrame { + timestamp: RtmpTimestamp::from_millis(0), + format: AudioFormat::Aac, + sample_rate: AudioSampleRate::Khz44, + is_8bit_sample: false, + is_stereo: true, + is_aac_sequence_header: false, + data: vec![0; 10], + }; + assert!(conn.send_audio(&frame).is_err()); + } + + #[test] + fn handle_error_transitions_to_disconnecting() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xAA; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + assert_eq!(conn.state(), RtmpConnectionState::Connecting); + conn.advance_send_buf(conn.send_buf().len()); + + // Server sends _error instead of _result after connect. + let mut srv_enc = ChunkEncoder::new(); + let mut error_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_error".to_string()), &mut error_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut error_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut error_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("error".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Rejected".to_string()), + ), + ("description".to_string(), Amf0Value::String("Connection refused".to_string())), + ]), + &mut error_payload, + ) + .unwrap(); + let error_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, error_payload); + conn.feed_recv_buf(&error_msg).unwrap(); + + assert_eq!(conn.state(), RtmpConnectionState::Disconnecting); + + let mut found_disconnect = false; + while let Some(evt) = conn.next_event() { + if let RtmpConnectionEvent::DisconnectedByPeer { reason } = evt { + assert!(reason.contains("Connection refused")); + found_disconnect = true; + } + } + assert!(found_disconnect, "expected DisconnectedByPeer event"); + } + + #[test] + fn handle_on_status_bad_name_does_not_publish() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xAA; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + let mut srv_enc = ChunkEncoder::new(); + + // connect _result + let mut result_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Object(vec![]), &mut result_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Success".to_string()), + ), + ]), + &mut result_payload, + ) + .unwrap(); + let result_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, result_payload); + conn.feed_recv_buf(&result_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // createStream _result + let mut cs_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(2.0), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut cs_payload).unwrap(); + let cs_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, cs_payload); + conn.feed_recv_buf(&cs_msg).unwrap(); + assert_eq!(conn.state(), RtmpConnectionState::PublishPending); + conn.advance_send_buf(conn.send_buf().len()); + + // Server sends onStatus with NetStream.Publish.BadName (contains "Error" + // variant path) — should NOT transition to Publishing. + let mut status_payload = Vec::new(); + amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut status_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ("code".to_string(), Amf0Value::String("NetStream.Publish.BadName".to_string())), + ("description".to_string(), Amf0Value::String("Bad stream name".to_string())), + ]), + &mut status_payload, + ) + .unwrap(); + let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); + conn.feed_recv_buf(&status_msg).unwrap(); + + assert_ne!(conn.state(), RtmpConnectionState::Publishing); + } + + #[test] + fn maybe_send_ack_when_window_exceeded() { + let (mut conn, mut srv_enc) = drive_to_publishing(); + + // By this point total_bytes_received > 0 and last_ack_sent_at == 0. + // Set a tiny ACK window so the next feed_recv_buf triggers an ACK. + let win_ack = + server_encode(&mut srv_enc, 2, MSG_WIN_ACK_SIZE, 0, 10u32.to_be_bytes().to_vec()); + conn.feed_recv_buf(&win_ack).unwrap(); + + // The ACK should already be in send_buf (total_bytes_received + // from the publishing flow far exceeds 10 bytes). + let buf = conn.send_buf(); + assert!(!buf.is_empty(), "expected ACK in send_buf after exceeding window"); + + let mut dec = ChunkDecoder::new(); + dec.push(buf); + let mut found_ack = false; + while let Ok(Some(msg)) = dec.decode_message() { + if msg.msg_type_id == MSG_ACK { + found_ack = true; + } + } + assert!(found_ack, "expected an ACK message in send_buf"); + } + + #[test] + fn amf0_decode_ecma_array_unsupported() { + // AMF0 EcmaArray (type marker 0x08) is not supported by this + // minimal codec. Verify it produces a descriptive error. + let mut buf = Vec::new(); + buf.push(0x08); // EcmaArray marker + buf.extend_from_slice(&4u32.to_be_bytes()); // associative-count + // key-value pair: "key" → Number(1.0) + buf.extend_from_slice(&2u16.to_be_bytes()); + buf.extend_from_slice(b"ab"); + buf.push(AMF0_NUMBER); + buf.extend_from_slice(&1.0f64.to_be_bytes()); + buf.extend_from_slice(&AMF0_OBJECT_END); + + let err = amf0_decode(&buf).unwrap_err(); + assert!( + err.to_string().contains("0x08"), + "error should mention the unsupported marker: {err}" + ); + } + + #[test] + fn chunk_decode_2byte_basic_header() { + let mut enc = ChunkEncoder::new(); + let payload = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let msg = OutboundMessage { + csid: 64, // 2-byte basic header form (csid 64..319) + timestamp: 10, + msg_type_id: MSG_COMMAND_AMF0, + stream_id: 0, + payload: payload.clone(), + }; + let mut wire = Vec::new(); + enc.encode_message(&msg, &mut wire); + + // Verify the basic header uses the 2-byte form. + assert_eq!(wire[0] & 0x3F, 0, "csid field should be 0 for 2-byte form"); + + let mut dec = ChunkDecoder::new(); + dec.push(&wire); + let decoded = dec.decode_message().unwrap().unwrap(); + assert_eq!(decoded.payload, payload); + assert_eq!(decoded.msg_type_id, MSG_COMMAND_AMF0); + assert_eq!(decoded.timestamp, 10); + } + + #[test] + fn chunk_decode_3byte_basic_header() { + let mut enc = ChunkEncoder::new(); + let payload = vec![0xCA, 0xFE, 0xBA, 0xBE]; + let msg = OutboundMessage { + csid: 320, // 3-byte basic header form (csid >= 320) + timestamp: 20, + msg_type_id: MSG_VIDEO, + stream_id: 1, + payload: payload.clone(), + }; + let mut wire = Vec::new(); + enc.encode_message(&msg, &mut wire); + + // Verify the basic header uses the 3-byte form. + assert_eq!(wire[0] & 0x3F, 1, "csid field should be 1 for 3-byte form"); + + let mut dec = ChunkDecoder::new(); + dec.push(&wire); + let decoded = dec.decode_message().unwrap().unwrap(); + assert_eq!(decoded.payload, payload); + assert_eq!(decoded.msg_type_id, MSG_VIDEO); + assert_eq!(decoded.stream_id, 1); + assert_eq!(decoded.timestamp, 20); + } } From 32eca866c0c5d1c012964faa9295591993953a6c Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sun, 24 May 2026 18:51:34 +0000 Subject: [PATCH 2/3] =?UTF-8?q?test(nodes):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20pin=20BadName=20behavior,=20remove=20redundant=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - BadName test now asserts PublishPending (current production behavior) with a comment noting the catch-all arm gap - Removed line-narration comments on basic header assertions Signed-off-by: Devin AI Signed-off-by: streamkit-devin --- crates/nodes/src/transport/rtmp_client.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/nodes/src/transport/rtmp_client.rs b/crates/nodes/src/transport/rtmp_client.rs index ecdc7874f..db7a6df0f 100644 --- a/crates/nodes/src/transport/rtmp_client.rs +++ b/crates/nodes/src/transport/rtmp_client.rs @@ -2593,8 +2593,10 @@ mod tests { assert_eq!(conn.state(), RtmpConnectionState::PublishPending); conn.advance_send_buf(conn.send_buf().len()); - // Server sends onStatus with NetStream.Publish.BadName (contains "Error" - // variant path) — should NOT transition to Publishing. + // Server sends onStatus with NetStream.Publish.BadName. + // Current production code only treats codes containing "Error", + // "Failed", or "Rejected" as terminal, so BadName falls through + // to the catch-all arm and the connection stays PublishPending. let mut status_payload = Vec::new(); amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); @@ -2611,6 +2613,7 @@ mod tests { let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); conn.feed_recv_buf(&status_msg).unwrap(); + assert_eq!(conn.state(), RtmpConnectionState::PublishPending); assert_ne!(conn.state(), RtmpConnectionState::Publishing); } @@ -2675,7 +2678,6 @@ mod tests { let mut wire = Vec::new(); enc.encode_message(&msg, &mut wire); - // Verify the basic header uses the 2-byte form. assert_eq!(wire[0] & 0x3F, 0, "csid field should be 0 for 2-byte form"); let mut dec = ChunkDecoder::new(); @@ -2700,7 +2702,6 @@ mod tests { let mut wire = Vec::new(); enc.encode_message(&msg, &mut wire); - // Verify the basic header uses the 3-byte form. assert_eq!(wire[0] & 0x3F, 1, "csid field should be 1 for 3-byte form"); let mut dec = ChunkDecoder::new(); From a95ff3854c60d0b51192371038437f73acdaf2d1 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Mon, 25 May 2026 12:06:36 +0000 Subject: [PATCH 3/3] test(nodes): trim narration comments per review - Simplify EcmaArray test fixture to single-byte marker (decode errors at byte 0, extra bytes were never read) - Remove restating comment above AVCC NAL-type assertions Signed-off-by: Devin AI Signed-off-by: streamkit-devin --- crates/nodes/src/transport/rtmp.rs | 1 - crates/nodes/src/transport/rtmp_client.rs | 10 +--------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/crates/nodes/src/transport/rtmp.rs b/crates/nodes/src/transport/rtmp.rs index 9a144aa45..f40f0aeed 100644 --- a/crates/nodes/src/transport/rtmp.rs +++ b/crates/nodes/src/transport/rtmp.rs @@ -1538,7 +1538,6 @@ mod tests { assert!(result.sps_list.is_empty()); assert!(result.pps_list.is_empty()); - // All three NALs (SEI, AUD, IDR) should appear in video_data. let avcc = &result.video_data; let mut nal_types = Vec::new(); let mut offset = 0; diff --git a/crates/nodes/src/transport/rtmp_client.rs b/crates/nodes/src/transport/rtmp_client.rs index db7a6df0f..c65b0edc2 100644 --- a/crates/nodes/src/transport/rtmp_client.rs +++ b/crates/nodes/src/transport/rtmp_client.rs @@ -2647,15 +2647,7 @@ mod tests { fn amf0_decode_ecma_array_unsupported() { // AMF0 EcmaArray (type marker 0x08) is not supported by this // minimal codec. Verify it produces a descriptive error. - let mut buf = Vec::new(); - buf.push(0x08); // EcmaArray marker - buf.extend_from_slice(&4u32.to_be_bytes()); // associative-count - // key-value pair: "key" → Number(1.0) - buf.extend_from_slice(&2u16.to_be_bytes()); - buf.extend_from_slice(b"ab"); - buf.push(AMF0_NUMBER); - buf.extend_from_slice(&1.0f64.to_be_bytes()); - buf.extend_from_slice(&AMF0_OBJECT_END); + let buf = [0x08u8]; let err = amf0_decode(&buf).unwrap_err(); assert!(