diff --git a/crates/sof-observer/fuzz/fuzz_targets/shred_fec_recover.rs b/crates/sof-observer/fuzz/fuzz_targets/shred_fec_recover.rs index 3b88de36..1a406739 100644 --- a/crates/sof-observer/fuzz/fuzz_targets/shred_fec_recover.rs +++ b/crates/sof-observer/fuzz/fuzz_targets/shred_fec_recover.rs @@ -234,8 +234,8 @@ fuzz_target!(|bytes: &[u8]| { assert!(recoverer.tracked_sets() <= max_tracked_sets); for recovered_packet in recovered { - assert_eq!(recovered_packet.len(), SIZE_OF_DATA_SHRED_PAYLOAD); - let parsed = parse_shred(&recovered_packet); + assert_eq!(recovered_packet.bytes.len(), SIZE_OF_DATA_SHRED_PAYLOAD); + let parsed = parse_shred(&recovered_packet.bytes); assert!(matches!(parsed, Ok(ParsedShred::Data(_)))); } } diff --git a/crates/sof-observer/src/app/runtime/prelude.rs b/crates/sof-observer/src/app/runtime/prelude.rs index 59838c11..b3bb6f02 100644 --- a/crates/sof-observer/src/app/runtime/prelude.rs +++ b/crates/sof-observer/src/app/runtime/prelude.rs @@ -43,7 +43,7 @@ pub(super) use crate::{ repair::MissingShredTracker, shred::{ fec::FecRecoverer, - wire::{ParseError, ParsedShred, ParsedShredHeader, parse_shred, parse_shred_header}, + wire::{ParseError, ParsedShredHeader, parse_shred_header}, }, verify::{ShredVerifier, VerifyStatus}, }; diff --git a/crates/sof-observer/src/app/runtime/runloop/packet_workers.rs b/crates/sof-observer/src/app/runtime/runloop/packet_workers.rs index 5f560a61..efc38d6a 100644 --- a/crates/sof-observer/src/app/runtime/runloop/packet_workers.rs +++ b/crates/sof-observer/src/app/runtime/runloop/packet_workers.rs @@ -515,14 +515,10 @@ where push_primary_shred(packet, &mut accepted_shreds); for recovered in recovered_packets { - let parsed_recovered = match parse_shred(&recovered) { - Ok(parsed) => parsed, - Err(_) => continue, - }; if verify_recovered_shreds { let recovered_accepted = match verify_packet_with_counters( shred_verifier.as_deref_mut(), - &recovered, + &recovered.bytes, observed_at, verify_strict_unknown, &mut verify_counters, @@ -534,40 +530,46 @@ where continue; } } - if let ParsedShred::Data(data) = parsed_recovered { - let Some(signature) = packet_signature_bytes(&recovered) else { - continue; - }; - let Some(variant) = packet_variant_byte(&recovered) else { - continue; - }; - #[cfg(feature = "gossip-bootstrap")] - maybe_record_observed_leader( - shred_verifier.as_deref(), - data.common.slot, - &mut observed_slot_leaders, - ); - accepted_shreds.push(WorkerAcceptedShred { - source: None, - observed_at, - slot: data.common.slot, - index: data.common.index, - fec_set_index: data.common.fec_set_index, - version: data.common.version, - variant, - signature, - kind: WorkerAcceptedShredKind::RecoveredData { - parent_slot: derive_parent_slot( - data.common.slot, - data.data_header.parent_offset, - ), - data_complete: data.data_header.data_complete(), - last_in_slot: data.data_header.last_in_slot(), - reference_tick: data.data_header.reference_tick(), - }, - payload_fragment: Some(SharedPayloadFragment::owned(data.payload)), - }); - } + let Some(signature) = packet_signature_bytes(&recovered.bytes) else { + continue; + }; + let Some(variant) = packet_variant_byte(&recovered.bytes) else { + continue; + }; + #[cfg(feature = "gossip-bootstrap")] + maybe_record_observed_leader( + shred_verifier.as_deref(), + recovered.parsed.common.slot, + &mut observed_slot_leaders, + ); + let packet_bytes: Arc<[u8]> = Arc::from(recovered.bytes); + let Some(payload_fragment) = SharedPayloadFragment::borrowed( + Arc::clone(&packet_bytes), + recovered.parsed.payload_offset, + recovered.parsed.payload_len, + ) else { + continue; + }; + accepted_shreds.push(WorkerAcceptedShred { + source: None, + observed_at, + slot: recovered.parsed.common.slot, + index: recovered.parsed.common.index, + fec_set_index: recovered.parsed.common.fec_set_index, + version: recovered.parsed.common.version, + variant, + signature, + kind: WorkerAcceptedShredKind::RecoveredData { + parent_slot: derive_parent_slot( + recovered.parsed.common.slot, + recovered.parsed.data_header.parent_offset, + ), + data_complete: recovered.parsed.data_header.data_complete(), + last_in_slot: recovered.parsed.data_header.last_in_slot(), + reference_tick: recovered.parsed.data_header.reference_tick(), + }, + payload_fragment: Some(payload_fragment), + }); } } diff --git a/crates/sof-observer/src/shred/fec/core.rs b/crates/sof-observer/src/shred/fec/core.rs index cc69e738..6ad68f87 100644 --- a/crates/sof-observer/src/shred/fec/core.rs +++ b/crates/sof-observer/src/shred/fec/core.rs @@ -7,7 +7,7 @@ use crate::shred::wire::{ParsedShredHeader, ShredVariant}; #[path = "recover.rs"] mod recover; -use recover::{parse_packet_signature, recover_missing_data}; +use recover::{RecoveredDataPacket, parse_packet_signature, recover_missing_data}; const SIZE_OF_SIGNATURE: usize = 64; const SIZE_OF_MERKLE_ROOT: usize = 32; @@ -69,7 +69,11 @@ impl FecRecoverer { } } - pub fn ingest_packet(&mut self, packet: &[u8], parsed: &ParsedShredHeader) -> Vec> { + pub fn ingest_packet( + &mut self, + packet: &[u8], + parsed: &ParsedShredHeader, + ) -> Vec { let signature = match parse_packet_signature(packet) { Some(signature) => signature, None => return Vec::new(), diff --git a/crates/sof-observer/src/shred/fec/recover.rs b/crates/sof-observer/src/shred/fec/recover.rs index b9f8e630..6f36c4f9 100644 --- a/crates/sof-observer/src/shred/fec/recover.rs +++ b/crates/sof-observer/src/shred/fec/recover.rs @@ -1,11 +1,19 @@ use super::*; -use crate::shred::wire::{ParsedShred, SIZE_OF_DATA_SHRED_PAYLOAD, parse_shred}; +use crate::shred::wire::{ + ParsedDataShredHeader, ParsedShredHeader, SIZE_OF_DATA_SHRED_PAYLOAD, parse_shred_header, +}; + +#[derive(Debug)] +pub struct RecoveredDataPacket { + pub bytes: Vec, + pub parsed: ParsedDataShredHeader, +} pub(super) fn recover_missing_data( set: &mut ErasureSet, fec_set_index: u32, reed_solomon_cache: &mut HashMap<(usize, usize), ReedSolomon>, -) -> Option>> { +) -> Option> { let config = set.config?; let variant = set.variant?; if config.num_data == 0 || config.num_coding == 0 { @@ -106,7 +114,7 @@ fn build_recovered_data_shred( erasure_shard: &[u8], leader_signature: &[u8; SIZE_OF_SIGNATURE], payload_len: usize, -) -> Option> { +) -> Option { let mut recovered = vec![0_u8; payload_len]; recovered .get_mut(..SIZE_OF_SIGNATURE)? @@ -116,9 +124,12 @@ fn build_recovered_data_shred( recovered .get_mut(start..end)? .copy_from_slice(erasure_shard); - match parse_shred(&recovered) { - Ok(ParsedShred::Data(_)) => Some(recovered), - Ok(ParsedShred::Code(_)) | Err(_) => None, + match parse_shred_header(&recovered) { + Ok(ParsedShredHeader::Data(parsed)) => Some(RecoveredDataPacket { + bytes: recovered, + parsed, + }), + Ok(ParsedShredHeader::Code(_)) | Err(_) => None, } }