diff --git a/crates/nodes/src/transport/rtmp.rs b/crates/nodes/src/transport/rtmp.rs index 97b21b13a..f40f0aeed 100644 --- a/crates/nodes/src/transport/rtmp.rs +++ b/crates/nodes/src/transport/rtmp.rs @@ -1494,4 +1494,68 @@ mod tests { "video_data should be empty for SPS/PPS-only access units" ); } + + #[test] + fn stamp_with_no_metadata_defaults_to_zero() { + let mut state = RtmpTimestampState::new(); + let pkt = Packet::Binary { + data: bytes::Bytes::from_static(&[0]), + metadata: None, + content_type: None, + }; + let ts = state.stamp(&pkt, Track::Video, "test"); + assert_eq!(ts, 0); + } + + #[test] + fn config_default_sample_rate() { + assert_eq!(default_sample_rate(), 48_000); + } + + #[test] + fn config_default_channels() { + assert_eq!(default_channels(), 2); + } + + #[test] + fn convert_annexb_sei_and_aud_in_video_data() { + let mut annexb = Vec::new(); + // SEI NAL (type 6) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let sei = [0x06, 0x05, 0x04, 0x03]; + annexb.extend_from_slice(&sei); + // AUD NAL (type 9) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let aud = [0x09, 0x10]; + annexb.extend_from_slice(&aud); + // IDR slice (type 5) + annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); + let idr = [0x65, 0x88, 0x84]; + annexb.extend_from_slice(&idr); + + let result = convert_annexb_to_avcc(&annexb); + + assert!(result.sps_list.is_empty()); + assert!(result.pps_list.is_empty()); + + let avcc = &result.video_data; + let mut nal_types = Vec::new(); + let mut offset = 0; + while offset + 4 <= avcc.len() { + let len = u32::from_be_bytes([ + avcc[offset], + avcc[offset + 1], + avcc[offset + 2], + avcc[offset + 3], + ]) as usize; + offset += 4; + assert!(offset + len <= avcc.len(), "AVCC data truncated"); + nal_types.push(avcc[offset] & H264_NAL_TYPE_MASK); + offset += len; + } + + assert!(nal_types.contains(&6), "SEI should be in video_data"); + assert!(nal_types.contains(&9), "AUD should be in video_data"); + assert!(nal_types.contains(&5), "IDR should be in video_data"); + } } diff --git a/crates/nodes/src/transport/rtmp_client.rs b/crates/nodes/src/transport/rtmp_client.rs index 1811b6279..c65b0edc2 100644 --- a/crates/nodes/src/transport/rtmp_client.rs +++ b/crates/nodes/src/transport/rtmp_client.rs @@ -2227,6 +2227,102 @@ mod tests { assert_eq!(val, 320); } + /// Drive a connection through the full handshake → connect → createStream + /// → publish flow, returning it in `Publishing` state. Mirrors the + /// `full_youtube_server_simulation` test but returns the connection and + /// server encoder for further use. + fn drive_to_publishing() -> (RtmpPublishClientConnection, ChunkEncoder) { + let url = RtmpUrl::parse("rtmp://x.rtmp.youtube.com/live2/stream-key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xBB; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + let mut srv_enc = ChunkEncoder::new(); + + let win_ack = server_encode( + &mut srv_enc, + 2, + MSG_WIN_ACK_SIZE, + 0, + 2_500_000u32.to_be_bytes().to_vec(), + ); + let mut set_bw_payload = 59_768_832u32.to_be_bytes().to_vec(); + set_bw_payload.push(2); + let set_bw = server_encode(&mut srv_enc, 2, MSG_SET_PEER_BANDWIDTH, 0, set_bw_payload); + let mut server_msg = Vec::new(); + server_msg.extend_from_slice(&win_ack); + server_msg.extend_from_slice(&set_bw); + conn.feed_recv_buf(&server_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // connect _result + let mut result_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut result_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("fmsVer".to_string(), Amf0Value::String("FMS/3,5,7,7009".to_string())), + ("capabilities".to_string(), Amf0Value::Number(31.0)), + ]), + &mut result_payload, + ) + .unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Success".to_string()), + ), + ("description".to_string(), Amf0Value::String("Connection succeeded".to_string())), + ]), + &mut result_payload, + ) + .unwrap(); + let result_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, result_payload); + conn.feed_recv_buf(&result_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // createStream _result + let mut cs_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(2.0), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut cs_payload).unwrap(); + let cs_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, cs_payload); + conn.feed_recv_buf(&cs_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // onStatus → NetStream.Publish.Start + let mut status_payload = Vec::new(); + amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut status_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ("code".to_string(), Amf0Value::String("NetStream.Publish.Start".to_string())), + ("description".to_string(), Amf0Value::String("Publishing".to_string())), + ]), + &mut status_payload, + ) + .unwrap(); + let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); + conn.feed_recv_buf(&status_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + assert_eq!(conn.state(), RtmpConnectionState::Publishing); + (conn, srv_enc) + } + /// Build an RTMP chunk from scratch using our encoder, simulating /// a server sending a message. Returns the raw bytes ready to feed into /// a client connection's `feed_recv_buf`. @@ -2366,4 +2462,246 @@ mod tests { conn.send_video(&video).unwrap(); assert!(!conn.send_buf().is_empty()); } + + #[test] + fn connection_send_audio_happy_path() { + let (mut conn, _srv_enc) = drive_to_publishing(); + + let frame = AudioFrame { + timestamp: RtmpTimestamp::from_millis(0), + format: AudioFormat::Aac, + sample_rate: AudioSampleRate::Khz44, + is_8bit_sample: false, + is_stereo: true, + is_aac_sequence_header: true, + data: vec![0x11, 0x90], + }; + conn.send_audio(&frame).unwrap(); + assert!(!conn.send_buf().is_empty()); + } + + #[test] + fn connection_send_audio_before_publishing_errors() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + let frame = AudioFrame { + timestamp: RtmpTimestamp::from_millis(0), + format: AudioFormat::Aac, + sample_rate: AudioSampleRate::Khz44, + is_8bit_sample: false, + is_stereo: true, + is_aac_sequence_header: false, + data: vec![0; 10], + }; + assert!(conn.send_audio(&frame).is_err()); + } + + #[test] + fn handle_error_transitions_to_disconnecting() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xAA; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + assert_eq!(conn.state(), RtmpConnectionState::Connecting); + conn.advance_send_buf(conn.send_buf().len()); + + // Server sends _error instead of _result after connect. + let mut srv_enc = ChunkEncoder::new(); + let mut error_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_error".to_string()), &mut error_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut error_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut error_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("error".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Rejected".to_string()), + ), + ("description".to_string(), Amf0Value::String("Connection refused".to_string())), + ]), + &mut error_payload, + ) + .unwrap(); + let error_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, error_payload); + conn.feed_recv_buf(&error_msg).unwrap(); + + assert_eq!(conn.state(), RtmpConnectionState::Disconnecting); + + let mut found_disconnect = false; + while let Some(evt) = conn.next_event() { + if let RtmpConnectionEvent::DisconnectedByPeer { reason } = evt { + assert!(reason.contains("Connection refused")); + found_disconnect = true; + } + } + assert!(found_disconnect, "expected DisconnectedByPeer event"); + } + + #[test] + fn handle_on_status_bad_name_does_not_publish() { + let url = RtmpUrl::parse("rtmp://127.0.0.1/live/key").unwrap(); + let mut conn = RtmpPublishClientConnection::new(url); + + let c0c1 = conn.send_buf().to_vec(); + conn.advance_send_buf(c0c1.len()); + + let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); + s0s1s2.push(0x03); + s0s1s2.extend_from_slice(&vec![0xAA; HANDSHAKE_SIZE]); + s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); + conn.feed_recv_buf(&s0s1s2).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + let mut srv_enc = ChunkEncoder::new(); + + // connect _result + let mut result_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut result_payload).unwrap(); + amf0_encode(&Amf0Value::Object(vec![]), &mut result_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ( + "code".to_string(), + Amf0Value::String("NetConnection.Connect.Success".to_string()), + ), + ]), + &mut result_payload, + ) + .unwrap(); + let result_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, result_payload); + conn.feed_recv_buf(&result_msg).unwrap(); + conn.advance_send_buf(conn.send_buf().len()); + + // createStream _result + let mut cs_payload = Vec::new(); + amf0_encode(&Amf0Value::String("_result".to_string()), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(2.0), &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut cs_payload).unwrap(); + amf0_encode(&Amf0Value::Number(1.0), &mut cs_payload).unwrap(); + let cs_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, cs_payload); + conn.feed_recv_buf(&cs_msg).unwrap(); + assert_eq!(conn.state(), RtmpConnectionState::PublishPending); + conn.advance_send_buf(conn.send_buf().len()); + + // Server sends onStatus with NetStream.Publish.BadName. + // Current production code only treats codes containing "Error", + // "Failed", or "Rejected" as terminal, so BadName falls through + // to the catch-all arm and the connection stays PublishPending. + let mut status_payload = Vec::new(); + amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); + amf0_encode(&Amf0Value::Null, &mut status_payload).unwrap(); + amf0_encode( + &Amf0Value::Object(vec![ + ("level".to_string(), Amf0Value::String("status".to_string())), + ("code".to_string(), Amf0Value::String("NetStream.Publish.BadName".to_string())), + ("description".to_string(), Amf0Value::String("Bad stream name".to_string())), + ]), + &mut status_payload, + ) + .unwrap(); + let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); + conn.feed_recv_buf(&status_msg).unwrap(); + + assert_eq!(conn.state(), RtmpConnectionState::PublishPending); + assert_ne!(conn.state(), RtmpConnectionState::Publishing); + } + + #[test] + fn maybe_send_ack_when_window_exceeded() { + let (mut conn, mut srv_enc) = drive_to_publishing(); + + // By this point total_bytes_received > 0 and last_ack_sent_at == 0. + // Set a tiny ACK window so the next feed_recv_buf triggers an ACK. + let win_ack = + server_encode(&mut srv_enc, 2, MSG_WIN_ACK_SIZE, 0, 10u32.to_be_bytes().to_vec()); + conn.feed_recv_buf(&win_ack).unwrap(); + + // The ACK should already be in send_buf (total_bytes_received + // from the publishing flow far exceeds 10 bytes). + let buf = conn.send_buf(); + assert!(!buf.is_empty(), "expected ACK in send_buf after exceeding window"); + + let mut dec = ChunkDecoder::new(); + dec.push(buf); + let mut found_ack = false; + while let Ok(Some(msg)) = dec.decode_message() { + if msg.msg_type_id == MSG_ACK { + found_ack = true; + } + } + assert!(found_ack, "expected an ACK message in send_buf"); + } + + #[test] + fn amf0_decode_ecma_array_unsupported() { + // AMF0 EcmaArray (type marker 0x08) is not supported by this + // minimal codec. Verify it produces a descriptive error. + let buf = [0x08u8]; + + let err = amf0_decode(&buf).unwrap_err(); + assert!( + err.to_string().contains("0x08"), + "error should mention the unsupported marker: {err}" + ); + } + + #[test] + fn chunk_decode_2byte_basic_header() { + let mut enc = ChunkEncoder::new(); + let payload = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let msg = OutboundMessage { + csid: 64, // 2-byte basic header form (csid 64..319) + timestamp: 10, + msg_type_id: MSG_COMMAND_AMF0, + stream_id: 0, + payload: payload.clone(), + }; + let mut wire = Vec::new(); + enc.encode_message(&msg, &mut wire); + + assert_eq!(wire[0] & 0x3F, 0, "csid field should be 0 for 2-byte form"); + + let mut dec = ChunkDecoder::new(); + dec.push(&wire); + let decoded = dec.decode_message().unwrap().unwrap(); + assert_eq!(decoded.payload, payload); + assert_eq!(decoded.msg_type_id, MSG_COMMAND_AMF0); + assert_eq!(decoded.timestamp, 10); + } + + #[test] + fn chunk_decode_3byte_basic_header() { + let mut enc = ChunkEncoder::new(); + let payload = vec![0xCA, 0xFE, 0xBA, 0xBE]; + let msg = OutboundMessage { + csid: 320, // 3-byte basic header form (csid >= 320) + timestamp: 20, + msg_type_id: MSG_VIDEO, + stream_id: 1, + payload: payload.clone(), + }; + let mut wire = Vec::new(); + enc.encode_message(&msg, &mut wire); + + assert_eq!(wire[0] & 0x3F, 1, "csid field should be 1 for 3-byte form"); + + let mut dec = ChunkDecoder::new(); + dec.push(&wire); + let decoded = dec.decode_message().unwrap().unwrap(); + assert_eq!(decoded.payload, payload); + assert_eq!(decoded.msg_type_id, MSG_VIDEO); + assert_eq!(decoded.stream_id, 1); + assert_eq!(decoded.timestamp, 20); + } }