From 8ae6760cba22ada5b757f467d3aab6f5517bb9e7 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 2 Jun 2026 20:37:15 +0800 Subject: [PATCH 01/16] Revert workaround avoiding NoRoom from WindowAdjust It will be replace by a proper fix. This reverts commit 636c25edf9d4dfe516c927112783db541dfb1765. "Merge branch 'matt/sftptesting' into dev/sftp-start" --- src/channel.rs | 18 ++++-------------- src/encrypt.rs | 4 +--- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 05fc06d..04c8ac5 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -211,16 +211,7 @@ impl Channels { let ch = self.get_mut(num)?; ch.finished_input(len); if let Some(w) = ch.check_window_adjust()? { - // The send buffer may be full. Ignore the failure and hope another adjustment is - // sent later. TODO improve this. - match s.send(w) { - Ok(_) => ch.pending_adjust = 0, - Err(Error::NoRoom { .. }) => { - // TODO better retry rather than hoping a retry occurs - debug!("noroom for adjustment") - } - error => return error, - } + s.send(w)?; } Ok(()) } @@ -1040,12 +1031,11 @@ impl Channel { } /// Returns a window adjustment packet if required - /// - /// Does not reset the adjustment to 0, should be done by caller on successful send. - fn check_window_adjust(&self) -> Result>> { - let num = self.send.as_ref().trap()?.num; + fn check_window_adjust(&mut self) -> Result>> { + let num = self.send.as_mut().trap()?.num; if self.pending_adjust > self.full_window / 2 { let adjust = self.pending_adjust as u32; + self.pending_adjust = 0; let p = packets::ChannelWindowAdjust { num, adjust }.into(); Ok(Some(p)) } else { diff --git a/src/encrypt.rs b/src/encrypt.rs index ff43166..632506c 100644 --- a/src/encrypt.rs +++ b/src/encrypt.rs @@ -127,9 +127,7 @@ impl KeyState { buf: &mut [u8], ) -> Result { let e = self.enc.encrypt(payload_len, buf, self.seq_encrypt.0); - if !matches!(e, Err(Error::NoRoom { .. })) { - self.seq_encrypt += 1; - } + self.seq_encrypt += 1; e } From 2e040b31c16915f1f1ea08c546a19487fc389398 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 23:41:13 +0800 Subject: [PATCH 02/16] Don't use can_output() for cli.auth.progress() We'll assume that KEX isn't going to run during auth, and the buffer space is adequate. This can be revisited if needed. --- src/conn.rs | 8 ++------ src/traffic.rs | 11 ----------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 87ee42a..3c60b6a 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -292,12 +292,8 @@ impl Conn { } } ConnState::PreAuth => { - // TODO. need to figure how we'll do "unbounded" responses - // and backpressure. can_output() should have a size check? - if s.can_output() { - if let Some(cli) = self.try_mut_client() { - disp.event = cli.auth.progress(); - } + if let Some(cli) = self.try_mut_client() { + disp.event = cli.auth.progress(); } // send userauth request } diff --git a/src/traffic.rs b/src/traffic.rs index 1b576d7..ebeb31a 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -444,13 +444,6 @@ impl<'a> TrafOut<'a> { matches!(self.state, TxState::Write { .. }) } - /// A simple test if a packet can be sent. `send_allowed` should be used - /// for more general situations - pub fn can_output(&self) -> bool { - // TODO don't use this - true - } - /// Returns payload space available to send a packet. Returns 0 if not ready or full pub fn send_allowed(&self, keys: &KeyState) -> usize { // TODO: test for full output buffer @@ -546,10 +539,6 @@ impl<'s, 'a> TrafSend<'s, 'a> { self.out.send_version() } - pub fn can_output(&self) -> bool { - self.out.can_output() - } - /// Returns the current receive sequence number pub fn recv_seq(&self) -> u32 { self.keys.seq_decrypt.0 From 41eb9b4b22c25f650a2b762e144ac8480a8df3da Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 23:35:09 +0800 Subject: [PATCH 03/16] Add TrafSend drain functions Can be used to avoid sending data and let the output buffer flush --- src/runner.rs | 6 ++++++ src/traffic.rs | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/runner.rs b/src/runner.rs index 8e9360d..ed85e39 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -608,6 +608,12 @@ impl<'a, CS: CliServ> Runner<'a, CS> { return Ok(Some(0)); } + if self.traf_out.is_draining() { + // Continual channel data could prevent drain from completing, + // so disallow channel writes when draining. + return Ok(Some(0)); + } + // minimum of buffer space and channel window available let payload_space = self.traf_out.send_allowed(&self.keys); // subtract space for packet headers prior to data diff --git a/src/traffic.rs b/src/traffic.rs index ebeb31a..aaa2238 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -356,6 +356,8 @@ pub(crate) struct TrafOut<'a> { /// in-place as they are written to `buf`. buf: SliceOrVec<'a>, state: TxState, + + drain: bool, } /// State machine for writes @@ -395,7 +397,7 @@ impl TrafIn<'static> { impl<'a> TrafOut<'a> { pub fn new(buf: &'a mut [u8]) -> Self { - Self { buf: SliceOrVec::Borrowed(buf), state: TxState::Idle } + Self { buf: SliceOrVec::Borrowed(buf), state: TxState::Idle, drain: false } } /// Serializes and and encrypts a packet to send @@ -501,6 +503,16 @@ impl<'a> TrafOut<'a> { pub fn sender<'s>(&'s mut self, keys: &'s mut KeyState) -> TrafSend<'s, 'a> { TrafSend::new(self, keys) } + + /// Return whether output is draining. + /// + /// Used to determine whether to initiate outbound traffic, such as channel writes. + /// Generally immediate responses to incoming messages should still be sent + /// even when draining. Otherwise they would need to be put in deferred_packets + /// which may run out. + pub fn is_draining(&self) -> bool { + self.drain + } } #[cfg(feature = "alloc")] @@ -547,4 +559,26 @@ impl<'s, 'a> TrafSend<'s, 'a> { pub fn enable_strict_kex(&mut self) { self.keys.enable_strict_kex(); } + + /// Set TrafOut to start draining output. + /// + /// Only one caller/area should be using set_drain_output() at a time. + /// For `TrafOut` itself there isn't a problem with multiple + /// callers enabling/disabling drain, but it could result in races + /// between callers. Only kex should be using it currently, so + /// there is a debug_assert! to that effect. + pub fn set_drain_output(&mut self, drain: bool) { + debug_assert!(drain != self.out.drain, "set_drain_output() dupe"); + self.out.drain = drain; + } + + /// Test if output buffer is empty. + /// + /// Fails if `set_drain_output(true)` wasn't set (debug panic) + /// This isn't inherent, but helps catch misuse (see comment + /// for set_drain_output()). + pub fn is_drained(&self) -> bool { + debug_assert!(self.out.drain); + matches!(self.out.state, TxState::Idle) + } } From fb58b0a30ae4bfb6c1fcb81e4e9c852428ea3d3e Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 23:32:15 +0800 Subject: [PATCH 04/16] Add separate Kex::Start* states These states are when a KexInit needs to be sent but TrafOut isn't drained. --- src/conn.rs | 18 +++-- src/kex.rs | 197 ++++++++++++++++++++++++++++++++++++-------------- src/runner.rs | 2 +- 3 files changed, 157 insertions(+), 60 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 3c60b6a..2bae68e 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -32,6 +32,12 @@ pub(crate) struct Conn { cliserv: CS, + /// Algorithm preferences for KEX + /// + /// This must remain unmodified during a key exchange. + /// The same config will be serialised both for sending + /// and receiving kexinit, and possibly also for DelayedPacket + /// sending. algo_conf: AlgoConfig, parse_ctx: ParseContext, @@ -263,18 +269,19 @@ impl Conn { } /// Updates `ConnState` and sends any packets required to progress the connection state. - // TODO can this just move to the bottom of handle_payload(), and make module-private? pub(crate) fn progress( &mut self, s: &mut TrafSend, ) -> Result { + self.kex.progress(&self.algo_conf, s)?; + let mut disp = Dispatched::default(); match self.state { ConnState::SendIdent => { s.send_version()?; // send early to avoid round trip latency // TODO: first_follows would have a second packet here - self.kex.send_kexinit(&self.algo_conf, s)?; + self.kex.start_kexinit(s); disp.event = DispatchEvent::Progressed; self.state = ConnState::ReceiveIdent } @@ -362,7 +369,7 @@ impl Conn { error::SSHProto.fail() } } - } else if !matches!(self.kex, Kex::Idle | Kex::KexInit { .. }) { + } else if self.kex.is_receiving() { // Normal KEX only allows certain packets match p.category() { packets::Category::All => Ok(()), @@ -399,8 +406,9 @@ impl Conn { self.sess_id.is_none() } - pub fn kex_is_idle(&self) -> bool { - matches!(self.kex, Kex::Idle) + /// True if KexInit has not been sent. + pub fn is_kex_sending(&self) -> bool { + self.kex.is_sending() } pub fn dispatch_packet( diff --git a/src/kex.rs b/src/kex.rs index 7da4acb..2d49397 100644 --- a/src/kex.rs +++ b/src/kex.rs @@ -112,15 +112,30 @@ pub(crate) enum Kex { /// No key exchange in progress Idle, + /// Waiting for empty output buffer before starting a KEX + /// + /// Our KexInit will only be sent once the output buffer is empty, + /// to ensure subsequent sent kex packets will fit. + StartKexInit, + /// Waiting for a KexInit packet, have sent one. KexInit { // Cookie sent in our KexInit packet. Kept so that we can reproduce the // KexInit packet when calculating the exchange hash. our_cookie: KexCookie, }, + + /// Have received a KexInit, waiting for an empty output buffer to start a KEX. + /// + /// This is similar to SendKexInit state but keeps state + /// from the peer's KexInit. + StartKexDH { our_cookie: KexCookie, algos: Algos, kex_hash: KexHash }, + /// Waiting for KexDHInit (server) or KexDHReply (client) KexDH { algos: Algos, kex_hash: KexHash }, /// Waiting for NewKeys. `output` is new keys to take into use + /// + /// Our own NewKeys message has been sent. NewKeys { output: KexOutput, algos: Algos }, /// A transient state use internally to transition between other states. @@ -163,7 +178,7 @@ impl KexHash { let mut kh = KexHash { hash_ctx: Sha256::new() }; let remote_version = remote_version.version().trap()?; // Recreate our own kexinit packet to hash. - let own_kexinit = Kex::::make_kexinit(our_cookie, algo_conf); + let own_kexinit = make_kexinit(our_cookie, algo_conf); if CS::is_client() { kh.hash_slice(ident::OUR_VERSION); kh.hash_slice(remote_version); @@ -288,29 +303,88 @@ impl Algos { } } +impl KexCookie { + pub fn generate() -> Result { + let mut c = KexCookie([0; _]); + random::fill_random(&mut c.0)?; + Ok(c) + } +} + impl Kex { pub fn new() -> Self { Kex::Idle } + /// Start a kexinit. Must be called from Idle state. + pub fn start_kexinit(&mut self, s: &mut TrafSend) { + debug_assert!(matches!(self, Kex::Idle)); + s.set_drain_output(true); + *self = Kex::StartKexInit + } + fn take(&mut self) -> Self { debug_assert!(!matches!(self, Kex::Taken)); core::mem::replace(self, Kex::Taken) } - /// Sends a `KexInit` message. Must be called from `Idle` state - pub fn send_kexinit( - &mut self, - conf: &AlgoConfig, - s: &mut TrafSend, - ) -> Result<()> { - if !matches!(self, Kex::Idle) { - return Err(Error::bug()); + /// Test if a KEX is in progress. + /// + /// This is from a sending perspective. Returns true after KexInit + /// has been sent and before NewKeys has been sent. + /// During that interval non-KEX packets are disallowed. + pub fn is_sending(&self) -> bool { + matches!(self, Kex::KexInit { .. } | Kex::KexDH { .. }) + } + + /// Test if a KEX is in progress from a receiving perspective. + /// + /// true after KexInit has been received and before NewKeys has been received. + pub fn is_receiving(&self) -> bool { + matches!( + self, + Kex::KexDH { .. } | Kex::StartKexDH { .. } | Kex::NewKeys { .. } + ) + } + + pub fn progress(&mut self, conf: &AlgoConfig, s: &mut TrafSend) -> Result<()> { + // Send KexInit if TrafOut has drained. + // TODO: It isn't necessary for TrafOut to drain entirely, it would be OK + // to instead check that it has adequate space for all the packets + // that would be sent in a KEX sequence. + trace!("{self:?}"); + match self { + Kex::Idle => { + // TODO run a rekey if needed. + } + Kex::StartKexInit => { + if s.is_drained() { + s.set_drain_output(false); + let our_cookie = KexCookie::generate()?; + s.send(make_kexinit(&our_cookie, conf))?; + *self = Kex::KexInit { our_cookie }; + } + } + Kex::StartKexDH { .. } => { + if s.is_drained() { + s.set_drain_output(false); + let Kex::StartKexDH { our_cookie, mut algos, kex_hash } = + self.take() + else { + unreachable!(); + }; + s.send(make_kexinit(&our_cookie, conf))?; + if CS::is_client() { + algos.kex.send_kexdhinit(s)?; + } + *self = Kex::KexDH { algos, kex_hash }; + } + } + Kex::KexInit { .. } | Kex::KexDH { .. } | Kex::NewKeys { .. } => { + // waiting for incoming packets, no progress + } + Kex::Taken => return Err(Error::bug()), } - let mut our_cookie = KexCookie([0u8; 16]); - random::fill_random(our_cookie.0.as_mut_slice())?; - s.send(Self::make_kexinit(&our_cookie, conf))?; - *self = Kex::KexInit { our_cookie }; Ok(()) } @@ -322,18 +396,18 @@ impl Kex { first_kex: bool, s: &mut TrafSend, ) -> Result<()> { - // Reply if we haven't already received one. This will bump the state to Kex::KexInit - if let Kex::Idle = self { - self.send_kexinit(algo_conf, s)?; - } - - let our_cookie = if let Kex::KexInit { ref our_cookie } = self { - our_cookie - } else { - // already received a KexInit - return error::PacketWrong.fail(); + let our_cookie = match self { + Kex::KexInit { our_cookie } => our_cookie.clone(), + Kex::Idle | Kex::StartKexInit => KexCookie::generate()?, + _ => { + // already received a KexInit + return error::PacketWrong.fail(); + } }; + // start is set when a KexInit hasn't yet been sent. + let start = matches!(self, Kex::Idle | Kex::StartKexInit); + let mut algos = Self::algo_negotiation(&remote_kexinit, algo_conf)?; debug!("{algos}"); @@ -341,9 +415,6 @@ impl Kex { debug!("kexinit has strict kex but wasn't first packet"); return error::PacketWrong.fail(); } - if CS::is_client() { - algos.kex.send_kexdhinit(s)?; - } if first_kex && algos.strict_kex { // strict-kex in initial KEXINIT enables it s.enable_strict_kex(); @@ -351,31 +422,24 @@ impl Kex { let kex_hash = KexHash::new::( algo_conf, - our_cookie, + &our_cookie, remote_version, &remote_kexinit.into(), )?; - *self = Kex::KexDH { algos, kex_hash }; - Ok(()) - } - fn make_kexinit<'a>(cookie: &KexCookie, conf: &'a AlgoConfig) -> Packet<'a> { - packets::KexInit { - cookie: cookie.clone(), - kex: (&conf.kexs).into(), - hostsig: (&conf.hostsig).into(), - cipher_c2s: (&conf.ciphers).into(), - cipher_s2c: (&conf.ciphers).into(), - mac_c2s: (&conf.macs).into(), - mac_s2c: (&conf.macs).into(), - comp_c2s: (&conf.comps).into(), - comp_s2c: (&conf.comps).into(), - lang_c2s: NameList::empty(), - lang_s2c: NameList::empty(), - first_follows: false, - reserved: 0, - } - .into() + *self = if start { + if matches!(self, Kex::Idle) { + s.set_drain_output(true); + } + // client kexdhinit will be sent after KexInit is sent. + Kex::StartKexDH { our_cookie, algos, kex_hash } + } else { + if CS::is_client() { + algos.kex.send_kexdhinit(s)?; + } + Kex::KexDH { algos, kex_hash } + }; + Ok(()) } pub fn handle_newkeys( @@ -514,11 +578,12 @@ impl Kex { } pub fn is_strict(&self) -> bool { - matches!( - self, - Kex::KexDH { algos: Algos { strict_kex: true, .. }, .. } - | Kex::NewKeys { algos: Algos { strict_kex: true, .. }, .. } - ) + match self { + Kex::StartKexDH { algos, .. } + | Kex::KexDH { algos, .. } + | Kex::NewKeys { algos, .. } => algos.strict_kex, + _ => false, + } } pub fn handle_kexdhreply(&self) -> Result { @@ -526,6 +591,7 @@ impl Kex { trace!("kexdhreply not client"); return error::SSHProto.fail(); } + if !matches!(self, Kex::KexDH { .. }) { return error::PacketWrong.fail(); } @@ -778,6 +844,25 @@ impl SharedSecret { } } +pub fn make_kexinit<'a>(cookie: &KexCookie, conf: &'a AlgoConfig) -> Packet<'a> { + packets::KexInit { + cookie: cookie.clone(), + kex: (&conf.kexs).into(), + hostsig: (&conf.hostsig).into(), + cipher_c2s: (&conf.ciphers).into(), + cipher_s2c: (&conf.ciphers).into(), + mac_c2s: (&conf.macs).into(), + mac_s2c: (&conf.macs).into(), + comp_c2s: (&conf.comps).into(), + comp_s2c: (&conf.comps).into(), + lang_c2s: NameList::empty(), + lang_s2c: NameList::empty(), + first_follows: false, + reserved: 0, + } + .into() +} + // TODO ZeroizeOnDrop. Sha256 doesn't support it yet. // https://github.com/RustCrypto/hashes/issues/87 pub(crate) struct KexOutput { @@ -1213,8 +1298,12 @@ mod tests { let mut cli = kex::Kex::new(); let mut serv = kex::Kex::new(); - serv.send_kexinit(&serv_conf, &mut ts.sender()).unwrap(); - cli.send_kexinit(&cli_conf, &mut tc.sender()).unwrap(); + serv.start_kexinit(&mut ts.sender()); + serv.progress(&serv_conf, &mut ts.sender()).unwrap(); + cli.start_kexinit(&mut tc.sender()); + cli.progress(&cli_conf, &mut tc.sender()).unwrap(); + assert!(matches!(serv, Kex::KexInit { .. })); + assert!(matches!(cli, Kex::KexInit { .. })); let cli_init = tc.next().unwrap(); let cli_init = if let Packet::KexInit(k) = cli_init { k } else { panic!() }; diff --git a/src/runner.rs b/src/runner.rs index ed85e39..0c25461 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -602,7 +602,7 @@ impl<'a, CS: CliServ> Runner<'a, CS> { // Avoid apps polling forever on a packet type that won't come dt.validate_send(CS::is_client())?; - if !self.conn.kex_is_idle() { + if self.conn.is_kex_sending() { // Only KEX messages are allowed during key exchange, // not data. return Ok(Some(0)); From f00a7e0c64fa1d63554a29da00e6a5c14c765c51 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 00:27:37 +0800 Subject: [PATCH 05/16] Add Clone to Packet variants --- src/packets.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/packets.rs b/src/packets.rs index 4b3cb53..b975bce 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -81,15 +81,15 @@ pub struct DebugPacket<'a> { pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct Disconnect<'a> { pub reason: u32, pub desc: TextString<'a>, - pub lang: TextString<'a>, + pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct Unimplemented { pub seq: u32, @@ -115,7 +115,7 @@ pub struct ServiceRequest<'a> { pub name: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ServiceAccept<'a> { pub name: &'a str, @@ -312,7 +312,7 @@ pub struct UserauthFailure<'a> { pub partial: bool, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct UserauthSuccess {} @@ -672,7 +672,7 @@ pub enum GlobalRequestMethod<'a> { // pub port: u32, // } -#[derive(Debug, SSHEncode)] +#[derive(Debug, SSHEncode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[sshwire(no_variant_names)] pub enum RequestSuccess { @@ -699,7 +699,7 @@ impl<'de> SSHDecode<'de> for RequestSuccess { // pub port: u32, // } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct RequestFailure {} @@ -731,7 +731,7 @@ pub enum ChannelOpenType<'a> { Unknown(Unknown<'a>), } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelOpenConfirmation { pub num: u32, @@ -740,7 +740,7 @@ pub struct ChannelOpenConfirmation { pub max_packet: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelOpenFailure<'a> { pub num: u32, @@ -749,7 +749,7 @@ pub struct ChannelOpenFailure<'a> { pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelWindowAdjust { pub num: u32, @@ -781,25 +781,25 @@ impl ChannelDataExt<'_> { pub const DATA_OFFSET: usize = 13; } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelEof { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelClose { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelSuccess { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelFailure { pub num: u32, From 48a518e726bd6ec3a2597b095c2e0ba11e50fc46 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 00:33:03 +0800 Subject: [PATCH 06/16] Add deferred packet queue A subset of Packet variants can be enqueued to a separate deferred packet queue. This variant is a lot smaller than Packet. The deferred packets need to be kept non-encrypted since they will be sent out-of-order with KEX packets that might continue being sent. --- src/error.rs | 19 ++++-- src/runner.rs | 21 ++++++ src/server.rs | 1 + src/sshwire.rs | 4 ++ src/traffic.rs | 176 +++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 212 insertions(+), 9 deletions(-) diff --git a/src/error.rs b/src/error.rs index e543b70..7e542a7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ use core::fmt::Arguments; use snafu::prelude::*; -use crate::channel::ChanNum; +use crate::{channel::ChanNum, packets::MessageNumber}; #[cfg(feature = "backtrace")] use snafu::Backtrace; @@ -20,14 +20,21 @@ use snafu::Backtrace; // TODO: maybe split this into a list of public vs private errors? #[snafu(visibility(pub))] pub enum Error { - /// Output buffer ran out of room - NoRoom { + /// Input buffer ran out + RanOut { #[cfg(feature = "backtrace")] backtrace: Backtrace, }, - /// Input buffer ran out - RanOut { + /// Can't currently send a packet + /// + /// Either output buffer is full, or a key exchange is in progress. + // Should only be returned from TrafOut::send_packet(), + // NoRoom is for other similar circumstances. + BusySend { packet: MessageNumber }, + + /// No room to write + NoRoom { #[cfg(feature = "backtrace")] backtrace: Backtrace, }, @@ -157,6 +164,8 @@ pub enum Error { // #[snafu(display("Program bug {location}"))] // Bug { location: snafu::Location }, /// Program bug + /// + /// Hitting this may instead panic on builds with debug-assertions. Bug, } diff --git a/src/runner.rs b/src/runner.rs index 0c25461..0c88f90 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -317,6 +317,21 @@ impl<'a, CS: CliServ> Runner<'a, CS> { self.traf_in.done_payload(); } + // Try moving packets from the deferred queue to the + // normal output. Can't happen during kex since non-kex + // packets are disallowed. + // KEX packets aren't included in the deferred list. + if !self.conn.is_kex_sending() { + match self.traf_out.send_deferred_packets(&mut self.keys) { + Ok(()) => (), + Err(Error::NoRoom { .. }) => { + // try again once there's space + return Ok(Event::None); + } + Err(e) => return Err(e), + } + } + let mut disp = Dispatched::default(); // Handle incoming packets @@ -614,6 +629,12 @@ impl<'a, CS: CliServ> Runner<'a, CS> { return Ok(Some(0)); } + if self.traf_out.have_deferred_packets() { + // Let deferred packets get moved to the output queue + // before sending more channel data. + return Ok(Some(0)); + } + // minimum of buffer space and channel window available let payload_space = self.traf_out.send_allowed(&self.keys); // subtract space for packet headers prior to data diff --git a/src/server.rs b/src/server.rs index 7018dde..24c2574 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,6 +22,7 @@ impl Server { s: &mut TrafSend, ) -> Result<()> { let success = match p.name { + // Any matched names here must also be in static_service_name() SSH_SERVICE_USERAUTH => true, SSH_SERVICE_CONNECTION => self.auth.is_authed(), _ => false, diff --git a/src/sshwire.rs b/src/sshwire.rs index f9a08d1..f0ced13 100644 --- a/src/sshwire.rs +++ b/src/sshwire.rs @@ -323,6 +323,10 @@ impl SSHEncode for heapless::String { pub struct TextString<'a>(pub &'a [u8]); impl<'a> TextString<'a> { + pub fn new() -> TextString<'static> { + TextString(&[]) + } + /// Returns the UTF-8 decoded string, using [`core::str::from_utf8`] /// /// Don't call this if you are avoiding including UTF-8 routines in diff --git a/src/traffic.rs b/src/traffic.rs index aaa2238..d6e6426 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -11,11 +11,20 @@ use zeroize::{Zeroize, ZeroizeOnDrop}; #[cfg(feature = "alloc")] use alloc::boxed::Box; +use heapless::Deque; -use crate::channel::{ChanData, ChanNum}; use crate::encrypt::{KeyState, KeysRecv, KeysSend, SSH_PAYLOAD_START}; use crate::ident::RemoteVersion; use crate::*; +use crate::{ + channel::{ChanData, ChanNum}, + packets::Packet, +}; + +/// Number of `DeferredPacket`s to queue. +/// +/// Each entry takes around 40 bytes. +const DEFER_COUNT: usize = 10; // Either a slice or boxed array. // Similar to managed::ManagedSlice. @@ -358,6 +367,8 @@ pub(crate) struct TrafOut<'a> { state: TxState, drain: bool, + + deferred_packets: Deque, } /// State machine for writes @@ -397,18 +408,69 @@ impl TrafIn<'static> { impl<'a> TrafOut<'a> { pub fn new(buf: &'a mut [u8]) -> Self { - Self { buf: SliceOrVec::Borrowed(buf), state: TxState::Idle, drain: false } + Self { + buf: SliceOrVec::Borrowed(buf), + state: TxState::Idle, + drain: false, + deferred_packets: Deque::new(), + } } /// Serializes and and encrypts a packet to send + /// + /// If the output buffer is full or a rekey is in progress, the + /// packet will be enqueued to be sent later. If the deferred packet queue + /// is full, `NoRoom` will be returned. + /// + /// `BusySend` error is recoverable, others should be treated as fatal. pub(crate) fn send_packet( &mut self, p: packets::Packet, keys: &mut KeyState, ) -> Result<()> { - // Sanity check + if self.deferred_packets.is_empty() { + // Send the packet normally if it fits + match self.send_packet_inner(&p, keys) { + // Defer it if noroom + Err(Error::NoRoom { .. }) => (), + res => return res, + } + } + + // NoRoom was returned, output buffer is full. + // Attempt to defer the packet. + + let pnum = p.message_num(); + trace!("Delay packet type {pnum:?}"); + + // Convert to a DeferredPacket if possible + let Ok(dp) = DeferredPacket::try_from(p) else { + // Packet type isn't expected to be deferred. + trace!("NoRoom packet type {pnum:?}"); + return error::BusySend { packet: pnum }.fail(); + }; + + self.deferred_packets.push_front(dp).map_err(|_| { + error!("No space to queue packet"); + trace!("NoRoom packet type {pnum:?}"); + error::BusySend { packet: pnum }.build() + }) + } + + /// Serializes and and encrypts a packet to send + /// + /// The packet will not be enqueued to the deferred queue. + /// This function should not usually be called directly. + /// + /// `BusySend` error is recoverable, others should be treated as fatal. + pub fn send_packet_inner( + &mut self, + p: &packets::Packet, + keys: &mut KeyState, + ) -> Result<()> { + // Check that packets are being encrypted match p.category() { - packets::Category::All | packets::Category::Kex => (), // OK cleartext + packets::Category::All | packets::Category::Kex => (), _ => { if keys.is_send_cleartext() { return Error::bug_msg("send cleartext"); @@ -441,6 +503,28 @@ impl<'a> TrafOut<'a> { Ok(()) } + pub fn send_deferred_packets(&mut self, keys: &mut KeyState) -> Result<()> { + while let Some(d) = self.deferred_packets.back() { + let p = Packet::from(d); + match self.send_packet_inner(&p, keys) { + Ok(()) => { + self.deferred_packets.pop_back(); + } + Err(Error::NoRoom { .. }) => { + // Can't progress, let the caller retry later + break; + } + Err(e) => return Err(e), + } + } + + Ok(()) + } + + pub fn have_deferred_packets(&self) -> bool { + !self.deferred_packets.is_empty() + } + pub fn is_output_pending(&self) -> bool { trace!("is_output_pending st {:?}", self.state); matches!(self.state, TxState::Write { .. }) @@ -448,6 +532,12 @@ impl<'a> TrafOut<'a> { /// Returns payload space available to send a packet. Returns 0 if not ready or full pub fn send_allowed(&self, keys: &KeyState) -> usize { + if !self.deferred_packets.is_empty() { + // Don't allow sending packets when deferred ones are waiting. + // Otherwise the deferred queue will run out of room. + return 0; + } + // TODO: test for full output buffer match self.state { TxState::Write { len, .. } => keys.max_enc_payload(self.buf.len() - len), @@ -580,5 +670,83 @@ impl<'s, 'a> TrafSend<'s, 'a> { pub fn is_drained(&self) -> bool { debug_assert!(self.out.drain); matches!(self.out.state, TxState::Idle) + && self.out.deferred_packets.is_empty() + } +} + +/// Packet types that may be sent once a currently-NoRoom TrafOut clears. +/// +/// Rather than storing an entire `Packet<'static>`, keep a queue of smaller +/// `DeferredPacket`s. +/// +/// These packet types account for most packets that may be sent as responses +/// or from other asynchronous events (rekey packet count reached, as an example). +/// +/// These also queue packets to be sent while a KEX is in progress +/// (other packet types aren't allowed) +/// +/// Some packet types aren't included here since they're deferred via other +/// mechanisms: +/// +/// - ChannelWindowAdjust. Can be retried later. +/// - KEX packets. The output buffer is drained at the start of a KEX. +/// - Userauth - we hope it only occurs early when traffic is +/// well defined (no channels) and no KEXes are happening. +#[derive(Debug)] +pub enum DeferredPacket { + ChannelSuccess(packets::ChannelSuccess), + ChannelFailure(packets::ChannelFailure), + ChannelOpenFailure(packets::ChannelOpenFailure<'static>), + ChannelOpenConfirmation(packets::ChannelOpenConfirmation), + ChannelEof(packets::ChannelEof), + ChannelClose(packets::ChannelClose), + Unimplemented(packets::Unimplemented), + RequestFailure(packets::RequestFailure), + RequestSuccess(packets::RequestSuccess), +} + +impl DeferredPacket {} + +impl From<&DeferredPacket> for Packet<'static> { + fn from(d: &DeferredPacket) -> Self { + match d { + DeferredPacket::ChannelSuccess(p) => (p.clone()).into(), + DeferredPacket::ChannelFailure(p) => (p.clone()).into(), + DeferredPacket::ChannelOpenFailure(p) => (p.clone()).into(), + DeferredPacket::ChannelOpenConfirmation(p) => (p.clone()).into(), + DeferredPacket::ChannelEof(p) => (p.clone()).into(), + DeferredPacket::ChannelClose(p) => (p.clone()).into(), + DeferredPacket::Unimplemented(p) => (p.clone()).into(), + DeferredPacket::RequestFailure(p) => (p.clone()).into(), + DeferredPacket::RequestSuccess(p) => (p.clone()).into(), + } + } +} + +impl<'a> TryFrom> for DeferredPacket { + type Error = Error; + fn try_from(packet: Packet<'a>) -> Result { + Ok(match packet { + Packet::ChannelSuccess(p) => Self::ChannelSuccess(p), + Packet::ChannelFailure(p) => Self::ChannelFailure(p), + Packet::ChannelOpenConfirmation(p) => Self::ChannelOpenConfirmation(p), + Packet::ChannelEof(p) => Self::ChannelEof(p), + Packet::ChannelClose(p) => Self::ChannelClose(p), + Packet::Unimplemented(p) => Self::Unimplemented(p), + Packet::RequestFailure(p) => Self::RequestFailure(p), + Packet::RequestSuccess(p) => Self::RequestSuccess(p), + + Packet::ChannelOpenFailure(p) => { + Self::ChannelOpenFailure(packets::ChannelOpenFailure { + // empty desc for 'static + desc: TextString::new(), + lang: "", + ..p + }) + } + + // Unhandled types + _ => return error::SSHProtoUnsupported.fail(), + }) } } From 09d40c092ae3f56eacc60f21426ce0583477ab5a Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 23:28:52 +0800 Subject: [PATCH 07/16] Add Channels::progress(), use it for window adjust Window adjust will be skipped while KEX is in progress. --- src/channel.rs | 36 +++++++++++++++++++++++------------- src/conn.rs | 8 ++++++++ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 04c8ac5..3a57cf8 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -200,8 +200,7 @@ impl Channels { Ok(p) } - /// Informs the channel layer that an incoming packet has been read out, - /// so a window adjustment can be sent. + /// Informs the channel layer that an incoming packet has been read out. pub(crate) fn finished_read( &mut self, num: ChanNum, @@ -210,9 +209,7 @@ impl Channels { ) -> Result<()> { let ch = self.get_mut(num)?; ch.finished_input(len); - if let Some(w) = ch.check_window_adjust()? { - s.send(w)?; - } + ch.check_send_window_adjust(s); Ok(()) } @@ -232,6 +229,13 @@ impl Channels { self.get(num).is_ok_and(|c| c.valid_send(dt)) } + pub fn progress(&mut self, s: &mut TrafSend) -> DispatchEvent { + for ch in self.ch.iter_mut().filter_map(|c| c.as_mut()) { + ch.check_send_window_adjust(s); + } + DispatchEvent::None + } + /// Wake the channel with a ready input data packet. pub fn wake_read(&mut self, num: ChanNum, dt: ChanData, is_client: bool) { if let Ok(ch) = self.get_mut(num) { @@ -1030,16 +1034,22 @@ impl Channel { true } - /// Returns a window adjustment packet if required - fn check_window_adjust(&mut self) -> Result>> { - let num = self.send.as_mut().trap()?.num; + /// Send a window adjust packet if required. + fn check_send_window_adjust(&mut self, s: &mut TrafSend) { if self.pending_adjust > self.full_window / 2 { let adjust = self.pending_adjust as u32; - self.pending_adjust = 0; - let p = packets::ChannelWindowAdjust { num, adjust }.into(); - Ok(Some(p)) - } else { - Ok(None) + let Some(sdir) = self.send.as_mut() else { + return; + }; + let num = sdir.num; + let p = packets::ChannelWindowAdjust { num, adjust }; + match s.send(p) { + Ok(()) => self.pending_adjust = 0, + Err(Error::BusySend { .. }) => { + // Do nothing, the adjustment will be sent later. + } + Err(e) => debug_assert!(false, "Window adjust send failed {e:?}"), + } } } } diff --git a/src/conn.rs b/src/conn.rs index 2bae68e..0bd1a4f 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -275,6 +275,14 @@ impl Conn { ) -> Result { self.kex.progress(&self.algo_conf, s)?; + if !self.is_kex_sending() { + let event = self.channels.progress(s); + if !event.is_none() { + // TODO better Dispatched constructor + return Ok(Dispatched { event, disconnect: false }); + } + } + let mut disp = Dispatched::default(); match self.state { ConnState::SendIdent => { From b9ff0dff6597bbf9f13d7390a1c27bbfb14f8f98 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 22 May 2026 23:36:10 +0800 Subject: [PATCH 08/16] Emit CliSessionOpener from Channels::progress() Previously CliSessionOpener was emitted when an OpenConfirmation packet was received. That's a problem since it could occur while a KEX is in progress. Instead emit the event from progress() This could still encounter a full output queue - maybe the queue should be drained before emitting the event. In practise this is unlikely for single sessions. When TCP forwarding is added a similar problem will need to be solved, that will have greater chance of hitting full buffers. --- src/channel.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3a57cf8..e2622c5 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -232,6 +232,20 @@ impl Channels { pub fn progress(&mut self, s: &mut TrafSend) -> DispatchEvent { for ch in self.ch.iter_mut().filter_map(|c| c.as_mut()) { ch.check_send_window_adjust(s); + + if ch.open_confirmed { + ch.open_confirmed = false; + match ch.ty { + ChanType::Session => { + return DispatchEvent::CliEvent(CliEventId::SessionOpened( + ch.num(), + )); + } + ChanType::Tcp => { + trace!("TODO tcp channel") + } + } + } } DispatchEvent::None } @@ -402,17 +416,8 @@ impl Channels { window: p.initial_window as usize, }); - match ch.ty { - ChanType::Session => { - ev = DispatchEvent::CliEvent( - CliEventId::SessionOpened(ch.num()), - ); - } - ChanType::Tcp => { - trace!("TODO tcp channel") - } - } - + // A future progress() will notify the application. + ch.open_confirmed = true; ch.state = ChanState::Normal; } _ => { @@ -747,6 +752,12 @@ pub(crate) struct Channel { full_window: usize, + /// Set when Open Confirmation is received. + /// + /// A subsequent `progress()` will emit a `SessionOpened` event + /// for the application to handle. + open_confirmed: bool, + /// Set once application has called `done()`. The channel /// will only be removed from the list /// (allowing channel number re-use) if `app_done` is set @@ -776,6 +787,7 @@ impl Channel { send: None, pending_adjust: 0, full_window: config::DEFAULT_WINDOW, + open_confirmed: false, app_done: false, read_waker: None, write_waker: None, From 71e86a951d870733aaeacd72d4725da013c8d2e1 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Sat, 23 May 2026 22:19:27 +0800 Subject: [PATCH 09/16] During KEX let the output queue drain before input --- src/runner.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/runner.rs b/src/runner.rs index 0c88f90..d5d6dd8 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -420,8 +420,20 @@ impl<'a, CS: CliServ> Runner<'a, CS> { // Whether [`input()`](input) is ready pub fn is_input_ready(&self) -> bool { - (self.conn.initial_sent() && self.traf_in.is_input_ready()) - || self.closed_input + if self.closed_input { + return true; + } + + // During KEX, let the output queue drain before accepting packets, + // so we can be sure not to hit NoRoom. + // kexdhinit/kexdhreply packets can be large. + // The deferred queue may still have packets since they + // can't be send during a KEX. + if self.conn.is_kex_sending() && self.traf_out.is_output_pending() { + return false; + } + + self.conn.initial_sent() && self.traf_in.is_input_ready() } /// Set a waker to be notified when [`input()`](Self::input) is ready to be called. From 040f9dcb546bcfaf4a503fa7abbe3ec4e0f8a618 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Sat, 23 May 2026 22:28:42 +0800 Subject: [PATCH 10/16] Track kex state and use it to defer packets Non-KEX packets can't be sent during a key exchange, so put them on the deferred queue. --- src/traffic.rs | 72 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/src/traffic.rs b/src/traffic.rs index d6e6426..cc6e47a 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -368,6 +368,9 @@ pub(crate) struct TrafOut<'a> { drain: bool, + // Set between sending KexInit and sending NewKeys. + sending_kex: bool, + deferred_packets: Deque, } @@ -412,6 +415,7 @@ impl<'a> TrafOut<'a> { buf: SliceOrVec::Borrowed(buf), state: TxState::Idle, drain: false, + sending_kex: false, deferred_packets: Deque::new(), } } @@ -428,16 +432,25 @@ impl<'a> TrafOut<'a> { p: packets::Packet, keys: &mut KeyState, ) -> Result<()> { - if self.deferred_packets.is_empty() { - // Send the packet normally if it fits + let is_kex = matches!(p.category(), packets::Category::Kex); + + if is_kex || (self.deferred_packets.is_empty() && !self.sending_kex) { + // Send the packet normally if it fits. + // KEX packets can be sent even if other packets are deferred + // (KEX packets don't get deferred themselves). + // A KexInit is only sent when there are no deferred packets, + // so we don't need to worry about incorrect reordering. match self.send_packet_inner(&p, keys) { - // Defer it if noroom - Err(Error::NoRoom { .. }) => (), + Err(Error::NoRoom { .. }) => { + debug_assert!(!is_kex, "KEX packets should have room"); + // non-kex packets get deferred + } res => return res, } } - // NoRoom was returned, output buffer is full. + // Either it didn't fit (NoRoom), or the deferred queue + // is already in use so we need to enqueue after that. // Attempt to defer the packet. let pnum = p.message_num(); @@ -457,18 +470,14 @@ impl<'a> TrafOut<'a> { }) } - /// Serializes and and encrypts a packet to send - /// - /// The packet will not be enqueued to the deferred queue. - /// This function should not usually be called directly. - /// - /// `BusySend` error is recoverable, others should be treated as fatal. - pub fn send_packet_inner( + // Check some invariants, and track whether we're sending KEX. + fn track_send_packet( &mut self, p: &packets::Packet, keys: &mut KeyState, ) -> Result<()> { // Check that packets are being encrypted + // This is checked in release and debug. match p.category() { packets::Category::All | packets::Category::Kex => (), _ => { @@ -478,6 +487,45 @@ impl<'a> TrafOut<'a> { } } + // KEX send packet catetory checked in debug builds for fuzzing. + if self.sending_kex { + // strict kex is ignored since we don't have access to + // conn.kex. + debug_assert!(matches!( + p.category(), + packets::Category::All | packets::Category::Kex + )); + } + + // Track KEX sending state + match p { + Packet::KexInit(_) => { + debug_assert!(!self.sending_kex); + self.sending_kex = true; + } + Packet::NewKeys(_) => { + debug_assert!(self.sending_kex); + self.sending_kex = false; + } + _ => (), + } + + Ok(()) + } + + /// Serializes and and encrypts a packet to send + /// + /// The packet will not be enqueued to the deferred queue. + /// This function should not usually be called directly. + /// + /// `NoRoom` error is recoverable, others should be treated as fatal. + pub fn send_packet_inner( + &mut self, + p: &packets::Packet, + keys: &mut KeyState, + ) -> Result<()> { + self.track_send_packet(p, keys)?; + // Either a fresh buffer or appending to write let (idx, len) = match self.state { TxState::Idle => (0, 0), From 53e2790566bbdf25384734c3d3d7824fff7352f7 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Sat, 23 May 2026 21:32:54 +0800 Subject: [PATCH 11/16] Rekey when packet/block count is reached Rekeying has two purposes. It avoids the packet sequence counter wrapping (32 bit), and avoids too many blocks being encrypted with the same key (applicable to AES). --- src/conn.rs | 2 -- src/encrypt.rs | 45 +++++++++++++++++++++++++++++++++++++++++---- src/kex.rs | 4 +++- src/traffic.rs | 4 ++++ 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 0bd1a4f..5f94ded 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -318,8 +318,6 @@ impl Conn { } trace!("-> {:?}, {disp:?}", self.state); - // TODO: if keys.seq > MAX_REKEY then we must rekey for security. - Ok(disp) } diff --git a/src/encrypt.rs b/src/encrypt.rs index 632506c..a82ffa8 100644 --- a/src/encrypt.rs +++ b/src/encrypt.rs @@ -11,7 +11,7 @@ use { use core::fmt; use core::fmt::Debug; -use core::num::Wrapping; +use core::num::{Saturating, Wrapping}; use aes::cipher::{BlockSizeUser, KeyIvInit, KeySizeUser, StreamCipher}; use hmac::Mac; @@ -38,6 +38,15 @@ const MAX_IV_LEN: usize = 32; /// Largest is chacha. Also applies to MAC keys const MAX_KEY_LEN: usize = 64; +// RFC4344. Ensure that rekey occurs at least every 2**31 blocks encrypted, +// and to avoid the 32-bit sequence counter wrapping. +// This is every 32GB transferred (at a 16 byte block size for AES). . +const REKEY_BLOCKS_OUT_LIMIT: u32 = 0x8000_0000; +// Receive limit is slightly higher, to avoid chance of both peers hitting the limit +// simultaneously, resulting in a needless (but harmless) second rekey. +const REKEY_BLOCKS_IN_LIMIT: u32 = 0x8100_0000; +const REKEY_BLOCK_SIZE: usize = 16; + /// Stateful [`Keys`], stores a sequence number as well, a single instance /// is kept for the entire session. #[derive(Debug)] @@ -49,6 +58,10 @@ pub(crate) struct KeyState { pub seq_encrypt: Wrapping, pub seq_decrypt: Wrapping, strict_kex: bool, + + // Count of 16-byte blocks, for rekeying. + rekey_blocks_out: Saturating, + rekey_blocks_in: Saturating, } impl KeyState { @@ -60,6 +73,8 @@ impl KeyState { seq_encrypt: Wrapping(0), seq_decrypt: Wrapping(0), strict_kex: false, + rekey_blocks_out: Saturating(0), + rekey_blocks_in: Saturating(0), } } @@ -67,6 +82,11 @@ impl KeyState { matches!(self.enc.cipher, EncKey::NoCipher) } + pub fn is_rekey_needed(&self) -> bool { + self.rekey_blocks_in.0 > REKEY_BLOCKS_IN_LIMIT + || self.rekey_blocks_out.0 > REKEY_BLOCKS_OUT_LIMIT + } + pub fn enable_strict_kex(&mut self) { self.strict_kex = true; } @@ -77,6 +97,7 @@ impl KeyState { if self.strict_kex { self.seq_encrypt = Wrapping(0); } + self.rekey_blocks_out = Saturating(0); } /// Updates with keys for receiving. @@ -93,6 +114,7 @@ impl KeyState { if self.strict_kex { self.seq_decrypt = Wrapping(0); } + self.rekey_blocks_in = Saturating(0); } pub fn recv_seq(&self) -> u32 { @@ -113,6 +135,9 @@ impl KeyState { pub fn decrypt(&mut self, buf: &mut [u8]) -> Result { let e = self.dec.decrypt(buf, self.seq_decrypt.0); self.seq_decrypt += 1; + if let Ok(payload_len) = e { + self.rekey_blocks_in += payload_len.div_ceil(REKEY_BLOCK_SIZE) as u32; + } e } @@ -120,14 +145,21 @@ impl KeyState { /// /// [`buf`] is the entire output buffer to encrypt in place. /// payload_len is the length of the payload portion - /// This is stateful, updating the sequence number. + /// + /// This is stateful, updating the sequence numbers. + /// If `NoRoom` is returned buf and state will be left unmodified to allow a retry. pub fn encrypt( &mut self, payload_len: usize, buf: &mut [u8], ) -> Result { let e = self.enc.encrypt(payload_len, buf, self.seq_encrypt.0); - self.seq_encrypt += 1; + + // NoRoom failure allows retries. + if !matches!(e, Err(Error::NoRoom { .. })) { + self.seq_encrypt += 1; + self.rekey_blocks_out += buf.len().div_ceil(REKEY_BLOCK_SIZE) as u32; + } e } @@ -242,6 +274,10 @@ impl KeysSend { /// Encrypt a buffer in-place, adding packet size, padding, MAC etc. /// Returns the total length. /// Ensures that the packet meets minimum and other length requirements. + /// + /// `buf` and internal state will be left unmodified if + // `Err(Error::NoRoom)` is returned, allowing a subsequent retry. + /// All other returned errors must be treated as fatal to the session. fn encrypt( &mut self, payload_len: usize, @@ -260,8 +296,9 @@ impl KeysSend { debug_assert_eq!(len % size_block, 0); }; + // buf and internal state must not be modified prior to returning NoRoom if len + size_integ > buf.len() { - error!("Output buffer {} is too small for packet", buf.len()); + trace!("Output buffer {} is too small for packet", buf.len()); return error::NoRoom.fail(); } diff --git a/src/kex.rs b/src/kex.rs index 2d49397..bd3dc00 100644 --- a/src/kex.rs +++ b/src/kex.rs @@ -355,7 +355,9 @@ impl Kex { trace!("{self:?}"); match self { Kex::Idle => { - // TODO run a rekey if needed. + if s.is_rekey_needed() { + self.start_kexinit(s); + } } Kex::StartKexInit => { if s.is_drained() { diff --git a/src/traffic.rs b/src/traffic.rs index cc6e47a..145d757 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -698,6 +698,10 @@ impl<'s, 'a> TrafSend<'s, 'a> { self.keys.enable_strict_kex(); } + pub fn is_rekey_needed(&self) -> bool { + self.keys.is_rekey_needed() + } + /// Set TrafOut to start draining output. /// /// Only one caller/area should be using set_drain_output() at a time. From 7221373526287b3d7c69eb3b8b7d97343a4e94c9 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 25 May 2026 00:03:13 +0800 Subject: [PATCH 12/16] Comment write_channel_ready() vs channel_wake_write() No functional changes needed, comment the cases involved so that future changes can take note. --- src/runner.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/runner.rs b/src/runner.rs index d5d6dd8..720a43c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -629,18 +629,25 @@ impl<'a, CS: CliServ> Runner<'a, CS> { // Avoid apps polling forever on a packet type that won't come dt.validate_send(CS::is_client())?; + // When write_channel_ready() returns Some(0), a subsequent + // channel_wake_write() needs to occur. + + // channel_wake_write() after KexDone. if self.conn.is_kex_sending() { // Only KEX messages are allowed during key exchange, // not data. return Ok(Some(0)); } + // Drain only happens prior to KEX, so can be woken after KexDone. if self.traf_out.is_draining() { // Continual channel data could prevent drain from completing, // so disallow channel writes when draining. return Ok(Some(0)); } + // write_channel_ready() will happen after the deferred packets are + // moved to main queued then written. if self.traf_out.have_deferred_packets() { // Let deferred packets get moved to the output queue // before sending more channel data. From 1ec5493876a33f15810ad8c4cffd9d3a8c0b43a0 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 25 May 2026 20:36:13 +0800 Subject: [PATCH 13/16] "unsupported" field for BusySend, fuzzer catch it Some message types not handled by DeferredPacket aren't likely to be encountered in the wild. The fuzzer shouldn't worry about those. --- fuzz/src/common.rs | 7 +++++++ src/error.rs | 5 ++++- src/traffic.rs | 4 ++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/fuzz/src/common.rs b/fuzz/src/common.rs index 937dd77..a46eaff 100644 --- a/fuzz/src/common.rs +++ b/fuzz/src/common.rs @@ -114,8 +114,15 @@ where #[allow(unused)] fn check_error(r: Result<()>) { + use packets::MessageNumber::*; if let Err(e) = r { match e { + Error::BusySend { ref packet, unsupported: true } => match packet { + // Don't fail for userauth or service accept. In real operation + // they would happen early after KEX when there is space. + SSH_MSG_USERAUTH_SUCCESS | SSH_MSG_SERVICE_ACCEPT => (), + _ => panic!("Unexpected packet type for {e:#?}"), + }, // Errors that should not occur. // May indicate a bug in this fuzz harness. Error::BadChannel { .. } diff --git a/src/error.rs b/src/error.rs index 7e542a7..c8f3ca7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,9 +29,12 @@ pub enum Error { /// Can't currently send a packet /// /// Either output buffer is full, or a key exchange is in progress. + /// `unsupported` is set if it is not expected for Sunset to be + /// able to defer that type of packet (it may indicate a bug in Sunset, + //// or traffic that is unanticipated or difficult to handle). // Should only be returned from TrafOut::send_packet(), // NoRoom is for other similar circumstances. - BusySend { packet: MessageNumber }, + BusySend { packet: MessageNumber, unsupported: bool }, /// No room to write NoRoom { diff --git a/src/traffic.rs b/src/traffic.rs index 145d757..37d00e3 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -460,13 +460,13 @@ impl<'a> TrafOut<'a> { let Ok(dp) = DeferredPacket::try_from(p) else { // Packet type isn't expected to be deferred. trace!("NoRoom packet type {pnum:?}"); - return error::BusySend { packet: pnum }.fail(); + return error::BusySend { packet: pnum, unsupported: true }.fail(); }; self.deferred_packets.push_front(dp).map_err(|_| { error!("No space to queue packet"); trace!("NoRoom packet type {pnum:?}"); - error::BusySend { packet: pnum }.build() + error::BusySend { packet: pnum, unsupported: false }.build() }) } From 70b87b56382109187ae65d96c5f9dd2659f0143d Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 25 May 2026 20:48:34 +0800 Subject: [PATCH 14/16] Update honggfuzz to 0.5.60 No noticable changes --- Cargo.lock | 4 ++-- fuzz/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7187e5d..82e42ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1553,9 +1553,9 @@ dependencies = [ [[package]] name = "honggfuzz" -version = "0.5.58" +version = "0.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8319f3cc8fe416e7aa1ab95dcc04fd49f35397a47d0b2f0f225f6dba346a07" +checksum = "4d6510a410acedd7a7683b3a45dafdc5ccf3c72d6addaa373497005964fc4e23" dependencies = [ "arbitrary", "lazy_static", diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 9c2da34..25f89aa 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -honggfuzz = { version = "=0.5.58", optional = true } +honggfuzz = { version = "=0.5.60", optional = true } afl = { version = "*", optional = true } sunset = { workspace = true, features = ["arbitrary"] } sunset-sshwire-derive.workspace = true From 31746f43a885bc239fba7e7269571dbace052f4b Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 25 May 2026 23:22:58 +0800 Subject: [PATCH 15/16] Remove unused cli/serv checks Not needed now that we have the CS: CliServ parameter --- src/conn.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 5f94ded..70d0e80 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -600,8 +600,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.client()?; - let packet = self.packet(payload)?; if let Packet::KexDHReply(p) = packet { Ok(p.k_s.0) @@ -614,7 +612,6 @@ impl Conn { &mut self, payload: &'p [u8], ) -> Result> { - self.client()?; let packet = self.packet(payload)?; CliSessionExit::new(&packet) } @@ -623,7 +620,6 @@ impl Conn { &mut self, payload: &'p [u8], ) -> Result> { - self.client()?; if let Packet::UserauthBanner(b) = self.packet(payload)? { Ok(Banner(b)) } else { @@ -639,8 +635,6 @@ impl Conn { s: &mut TrafSend, keys: &[&SignKey], ) -> Result<()> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::KexDHInit(p) = packet { self.kex.resume_kexdhinit( @@ -659,8 +653,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::UserauthRequest(UserauthRequest { method: AuthMethod::Password(m), @@ -676,8 +668,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::UserauthRequest(UserauthRequest { method: AuthMethod::PubKey(m), From 58ba5a7b0bc4f900d2c73e55d54af87e9525ee3e Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Wed, 27 May 2026 23:15:55 +0800 Subject: [PATCH 16/16] Fix BadUsage if an error occurs in resume_servauth conn.resume_servauth() can fail if the send buffer is full. In that case the runner.resume_servauth() still needs to call resume_nocheck() so that the event is completed - otherwise on the next call to progress(), BadUsage will be emitted. This shouldn't generally happen (auth packets are assumed to be sent early with adequate buffer space), was caught by fuzzing. Further changes will be made later so that the BusySend can be returned from progress() (it's lost by the drop handler currently). --- src/runner.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/runner.rs b/src/runner.rs index 720a43c..7ae397e 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -229,13 +229,19 @@ impl<'a> Runner<'a, server::Server> { )); let mut s = self.traf_out.sender(&mut self.keys); - let ev = self.conn.resume_servauth(allow, &mut s)?; - self.set_extra_resume(ev); + let ev = self.conn.resume_servauth(allow, &mut s); + let r = match ev { + Ok(ev) => { + self.set_extra_resume(ev); + Ok(()) + } + Err(e) => Err(e), + }; // auth packets have passwords self.traf_in.zeroize_payload(); self.resume_nocheck(); - Ok(()) + r } pub(crate) fn resume_servauth_pkok(&mut self) -> Result<()> {