diff --git a/Cargo.lock b/Cargo.lock index 7187e5d..82e42ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1553,9 +1553,9 @@ dependencies = [ [[package]] name = "honggfuzz" -version = "0.5.58" +version = "0.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8319f3cc8fe416e7aa1ab95dcc04fd49f35397a47d0b2f0f225f6dba346a07" +checksum = "4d6510a410acedd7a7683b3a45dafdc5ccf3c72d6addaa373497005964fc4e23" dependencies = [ "arbitrary", "lazy_static", diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 9c2da34..25f89aa 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -honggfuzz = { version = "=0.5.58", optional = true } +honggfuzz = { version = "=0.5.60", optional = true } afl = { version = "*", optional = true } sunset = { workspace = true, features = ["arbitrary"] } sunset-sshwire-derive.workspace = true diff --git a/fuzz/src/common.rs b/fuzz/src/common.rs index 937dd77..a46eaff 100644 --- a/fuzz/src/common.rs +++ b/fuzz/src/common.rs @@ -114,8 +114,15 @@ where #[allow(unused)] fn check_error(r: Result<()>) { + use packets::MessageNumber::*; if let Err(e) = r { match e { + Error::BusySend { ref packet, unsupported: true } => match packet { + // Don't fail for userauth or service accept. In real operation + // they would happen early after KEX when there is space. + SSH_MSG_USERAUTH_SUCCESS | SSH_MSG_SERVICE_ACCEPT => (), + _ => panic!("Unexpected packet type for {e:#?}"), + }, // Errors that should not occur. // May indicate a bug in this fuzz harness. Error::BadChannel { .. } diff --git a/src/channel.rs b/src/channel.rs index 05fc06d..e2622c5 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -200,8 +200,7 @@ impl Channels { Ok(p) } - /// Informs the channel layer that an incoming packet has been read out, - /// so a window adjustment can be sent. + /// Informs the channel layer that an incoming packet has been read out. pub(crate) fn finished_read( &mut self, num: ChanNum, @@ -210,18 +209,7 @@ impl Channels { ) -> Result<()> { let ch = self.get_mut(num)?; ch.finished_input(len); - if let Some(w) = ch.check_window_adjust()? { - // The send buffer may be full. Ignore the failure and hope another adjustment is - // sent later. TODO improve this. - match s.send(w) { - Ok(_) => ch.pending_adjust = 0, - Err(Error::NoRoom { .. }) => { - // TODO better retry rather than hoping a retry occurs - debug!("noroom for adjustment") - } - error => return error, - } - } + ch.check_send_window_adjust(s); Ok(()) } @@ -241,6 +229,27 @@ impl Channels { self.get(num).is_ok_and(|c| c.valid_send(dt)) } + pub fn progress(&mut self, s: &mut TrafSend) -> DispatchEvent { + for ch in self.ch.iter_mut().filter_map(|c| c.as_mut()) { + ch.check_send_window_adjust(s); + + if ch.open_confirmed { + ch.open_confirmed = false; + match ch.ty { + ChanType::Session => { + return DispatchEvent::CliEvent(CliEventId::SessionOpened( + ch.num(), + )); + } + ChanType::Tcp => { + trace!("TODO tcp channel") + } + } + } + } + DispatchEvent::None + } + /// Wake the channel with a ready input data packet. pub fn wake_read(&mut self, num: ChanNum, dt: ChanData, is_client: bool) { if let Ok(ch) = self.get_mut(num) { @@ -407,17 +416,8 @@ impl Channels { window: p.initial_window as usize, }); - match ch.ty { - ChanType::Session => { - ev = DispatchEvent::CliEvent( - CliEventId::SessionOpened(ch.num()), - ); - } - ChanType::Tcp => { - trace!("TODO tcp channel") - } - } - + // A future progress() will notify the application. + ch.open_confirmed = true; ch.state = ChanState::Normal; } _ => { @@ -752,6 +752,12 @@ pub(crate) struct Channel { full_window: usize, + /// Set when Open Confirmation is received. + /// + /// A subsequent `progress()` will emit a `SessionOpened` event + /// for the application to handle. + open_confirmed: bool, + /// Set once application has called `done()`. The channel /// will only be removed from the list /// (allowing channel number re-use) if `app_done` is set @@ -781,6 +787,7 @@ impl Channel { send: None, pending_adjust: 0, full_window: config::DEFAULT_WINDOW, + open_confirmed: false, app_done: false, read_waker: None, write_waker: None, @@ -1039,17 +1046,22 @@ impl Channel { true } - /// Returns a window adjustment packet if required - /// - /// Does not reset the adjustment to 0, should be done by caller on successful send. - fn check_window_adjust(&self) -> Result>> { - let num = self.send.as_ref().trap()?.num; + /// Send a window adjust packet if required. + fn check_send_window_adjust(&mut self, s: &mut TrafSend) { if self.pending_adjust > self.full_window / 2 { let adjust = self.pending_adjust as u32; - let p = packets::ChannelWindowAdjust { num, adjust }.into(); - Ok(Some(p)) - } else { - Ok(None) + let Some(sdir) = self.send.as_mut() else { + return; + }; + let num = sdir.num; + let p = packets::ChannelWindowAdjust { num, adjust }; + match s.send(p) { + Ok(()) => self.pending_adjust = 0, + Err(Error::BusySend { .. }) => { + // Do nothing, the adjustment will be sent later. + } + Err(e) => debug_assert!(false, "Window adjust send failed {e:?}"), + } } } } diff --git a/src/conn.rs b/src/conn.rs index 87ee42a..70d0e80 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -32,6 +32,12 @@ pub(crate) struct Conn { cliserv: CS, + /// Algorithm preferences for KEX + /// + /// This must remain unmodified during a key exchange. + /// The same config will be serialised both for sending + /// and receiving kexinit, and possibly also for DelayedPacket + /// sending. algo_conf: AlgoConfig, parse_ctx: ParseContext, @@ -263,18 +269,27 @@ impl Conn { } /// Updates `ConnState` and sends any packets required to progress the connection state. - // TODO can this just move to the bottom of handle_payload(), and make module-private? pub(crate) fn progress( &mut self, s: &mut TrafSend, ) -> Result { + self.kex.progress(&self.algo_conf, s)?; + + if !self.is_kex_sending() { + let event = self.channels.progress(s); + if !event.is_none() { + // TODO better Dispatched constructor + return Ok(Dispatched { event, disconnect: false }); + } + } + let mut disp = Dispatched::default(); match self.state { ConnState::SendIdent => { s.send_version()?; // send early to avoid round trip latency // TODO: first_follows would have a second packet here - self.kex.send_kexinit(&self.algo_conf, s)?; + self.kex.start_kexinit(s); disp.event = DispatchEvent::Progressed; self.state = ConnState::ReceiveIdent } @@ -292,12 +307,8 @@ impl Conn { } } ConnState::PreAuth => { - // TODO. need to figure how we'll do "unbounded" responses - // and backpressure. can_output() should have a size check? - if s.can_output() { - if let Some(cli) = self.try_mut_client() { - disp.event = cli.auth.progress(); - } + if let Some(cli) = self.try_mut_client() { + disp.event = cli.auth.progress(); } // send userauth request } @@ -307,8 +318,6 @@ impl Conn { } trace!("-> {:?}, {disp:?}", self.state); - // TODO: if keys.seq > MAX_REKEY then we must rekey for security. - Ok(disp) } @@ -366,7 +375,7 @@ impl Conn { error::SSHProto.fail() } } - } else if !matches!(self.kex, Kex::Idle | Kex::KexInit { .. }) { + } else if self.kex.is_receiving() { // Normal KEX only allows certain packets match p.category() { packets::Category::All => Ok(()), @@ -403,8 +412,9 @@ impl Conn { self.sess_id.is_none() } - pub fn kex_is_idle(&self) -> bool { - matches!(self.kex, Kex::Idle) + /// True if KexInit has not been sent. + pub fn is_kex_sending(&self) -> bool { + self.kex.is_sending() } pub fn dispatch_packet( @@ -590,8 +600,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.client()?; - let packet = self.packet(payload)?; if let Packet::KexDHReply(p) = packet { Ok(p.k_s.0) @@ -604,7 +612,6 @@ impl Conn { &mut self, payload: &'p [u8], ) -> Result> { - self.client()?; let packet = self.packet(payload)?; CliSessionExit::new(&packet) } @@ -613,7 +620,6 @@ impl Conn { &mut self, payload: &'p [u8], ) -> Result> { - self.client()?; if let Packet::UserauthBanner(b) = self.packet(payload)? { Ok(Banner(b)) } else { @@ -629,8 +635,6 @@ impl Conn { s: &mut TrafSend, keys: &[&SignKey], ) -> Result<()> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::KexDHInit(p) = packet { self.kex.resume_kexdhinit( @@ -649,8 +653,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::UserauthRequest(UserauthRequest { method: AuthMethod::Password(m), @@ -666,8 +668,6 @@ impl Conn { &self, payload: &'f [u8], ) -> Result> { - self.server()?; - let packet = self.packet(payload)?; if let Packet::UserauthRequest(UserauthRequest { method: AuthMethod::PubKey(m), diff --git a/src/encrypt.rs b/src/encrypt.rs index ff43166..a82ffa8 100644 --- a/src/encrypt.rs +++ b/src/encrypt.rs @@ -11,7 +11,7 @@ use { use core::fmt; use core::fmt::Debug; -use core::num::Wrapping; +use core::num::{Saturating, Wrapping}; use aes::cipher::{BlockSizeUser, KeyIvInit, KeySizeUser, StreamCipher}; use hmac::Mac; @@ -38,6 +38,15 @@ const MAX_IV_LEN: usize = 32; /// Largest is chacha. Also applies to MAC keys const MAX_KEY_LEN: usize = 64; +// RFC4344. Ensure that rekey occurs at least every 2**31 blocks encrypted, +// and to avoid the 32-bit sequence counter wrapping. +// This is every 32GB transferred (at a 16 byte block size for AES). . +const REKEY_BLOCKS_OUT_LIMIT: u32 = 0x8000_0000; +// Receive limit is slightly higher, to avoid chance of both peers hitting the limit +// simultaneously, resulting in a needless (but harmless) second rekey. +const REKEY_BLOCKS_IN_LIMIT: u32 = 0x8100_0000; +const REKEY_BLOCK_SIZE: usize = 16; + /// Stateful [`Keys`], stores a sequence number as well, a single instance /// is kept for the entire session. #[derive(Debug)] @@ -49,6 +58,10 @@ pub(crate) struct KeyState { pub seq_encrypt: Wrapping, pub seq_decrypt: Wrapping, strict_kex: bool, + + // Count of 16-byte blocks, for rekeying. + rekey_blocks_out: Saturating, + rekey_blocks_in: Saturating, } impl KeyState { @@ -60,6 +73,8 @@ impl KeyState { seq_encrypt: Wrapping(0), seq_decrypt: Wrapping(0), strict_kex: false, + rekey_blocks_out: Saturating(0), + rekey_blocks_in: Saturating(0), } } @@ -67,6 +82,11 @@ impl KeyState { matches!(self.enc.cipher, EncKey::NoCipher) } + pub fn is_rekey_needed(&self) -> bool { + self.rekey_blocks_in.0 > REKEY_BLOCKS_IN_LIMIT + || self.rekey_blocks_out.0 > REKEY_BLOCKS_OUT_LIMIT + } + pub fn enable_strict_kex(&mut self) { self.strict_kex = true; } @@ -77,6 +97,7 @@ impl KeyState { if self.strict_kex { self.seq_encrypt = Wrapping(0); } + self.rekey_blocks_out = Saturating(0); } /// Updates with keys for receiving. @@ -93,6 +114,7 @@ impl KeyState { if self.strict_kex { self.seq_decrypt = Wrapping(0); } + self.rekey_blocks_in = Saturating(0); } pub fn recv_seq(&self) -> u32 { @@ -113,6 +135,9 @@ impl KeyState { pub fn decrypt(&mut self, buf: &mut [u8]) -> Result { let e = self.dec.decrypt(buf, self.seq_decrypt.0); self.seq_decrypt += 1; + if let Ok(payload_len) = e { + self.rekey_blocks_in += payload_len.div_ceil(REKEY_BLOCK_SIZE) as u32; + } e } @@ -120,15 +145,20 @@ impl KeyState { /// /// [`buf`] is the entire output buffer to encrypt in place. /// payload_len is the length of the payload portion - /// This is stateful, updating the sequence number. + /// + /// This is stateful, updating the sequence numbers. + /// If `NoRoom` is returned buf and state will be left unmodified to allow a retry. pub fn encrypt( &mut self, payload_len: usize, buf: &mut [u8], ) -> Result { let e = self.enc.encrypt(payload_len, buf, self.seq_encrypt.0); + + // NoRoom failure allows retries. if !matches!(e, Err(Error::NoRoom { .. })) { self.seq_encrypt += 1; + self.rekey_blocks_out += buf.len().div_ceil(REKEY_BLOCK_SIZE) as u32; } e } @@ -244,6 +274,10 @@ impl KeysSend { /// Encrypt a buffer in-place, adding packet size, padding, MAC etc. /// Returns the total length. /// Ensures that the packet meets minimum and other length requirements. + /// + /// `buf` and internal state will be left unmodified if + // `Err(Error::NoRoom)` is returned, allowing a subsequent retry. + /// All other returned errors must be treated as fatal to the session. fn encrypt( &mut self, payload_len: usize, @@ -262,8 +296,9 @@ impl KeysSend { debug_assert_eq!(len % size_block, 0); }; + // buf and internal state must not be modified prior to returning NoRoom if len + size_integ > buf.len() { - error!("Output buffer {} is too small for packet", buf.len()); + trace!("Output buffer {} is too small for packet", buf.len()); return error::NoRoom.fail(); } diff --git a/src/error.rs b/src/error.rs index e543b70..c8f3ca7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ use core::fmt::Arguments; use snafu::prelude::*; -use crate::channel::ChanNum; +use crate::{channel::ChanNum, packets::MessageNumber}; #[cfg(feature = "backtrace")] use snafu::Backtrace; @@ -20,14 +20,24 @@ use snafu::Backtrace; // TODO: maybe split this into a list of public vs private errors? #[snafu(visibility(pub))] pub enum Error { - /// Output buffer ran out of room - NoRoom { + /// Input buffer ran out + RanOut { #[cfg(feature = "backtrace")] backtrace: Backtrace, }, - /// Input buffer ran out - RanOut { + /// Can't currently send a packet + /// + /// Either output buffer is full, or a key exchange is in progress. + /// `unsupported` is set if it is not expected for Sunset to be + /// able to defer that type of packet (it may indicate a bug in Sunset, + //// or traffic that is unanticipated or difficult to handle). + // Should only be returned from TrafOut::send_packet(), + // NoRoom is for other similar circumstances. + BusySend { packet: MessageNumber, unsupported: bool }, + + /// No room to write + NoRoom { #[cfg(feature = "backtrace")] backtrace: Backtrace, }, @@ -157,6 +167,8 @@ pub enum Error { // #[snafu(display("Program bug {location}"))] // Bug { location: snafu::Location }, /// Program bug + /// + /// Hitting this may instead panic on builds with debug-assertions. Bug, } diff --git a/src/kex.rs b/src/kex.rs index 7da4acb..bd3dc00 100644 --- a/src/kex.rs +++ b/src/kex.rs @@ -112,15 +112,30 @@ pub(crate) enum Kex { /// No key exchange in progress Idle, + /// Waiting for empty output buffer before starting a KEX + /// + /// Our KexInit will only be sent once the output buffer is empty, + /// to ensure subsequent sent kex packets will fit. + StartKexInit, + /// Waiting for a KexInit packet, have sent one. KexInit { // Cookie sent in our KexInit packet. Kept so that we can reproduce the // KexInit packet when calculating the exchange hash. our_cookie: KexCookie, }, + + /// Have received a KexInit, waiting for an empty output buffer to start a KEX. + /// + /// This is similar to SendKexInit state but keeps state + /// from the peer's KexInit. + StartKexDH { our_cookie: KexCookie, algos: Algos, kex_hash: KexHash }, + /// Waiting for KexDHInit (server) or KexDHReply (client) KexDH { algos: Algos, kex_hash: KexHash }, /// Waiting for NewKeys. `output` is new keys to take into use + /// + /// Our own NewKeys message has been sent. NewKeys { output: KexOutput, algos: Algos }, /// A transient state use internally to transition between other states. @@ -163,7 +178,7 @@ impl KexHash { let mut kh = KexHash { hash_ctx: Sha256::new() }; let remote_version = remote_version.version().trap()?; // Recreate our own kexinit packet to hash. - let own_kexinit = Kex::::make_kexinit(our_cookie, algo_conf); + let own_kexinit = make_kexinit(our_cookie, algo_conf); if CS::is_client() { kh.hash_slice(ident::OUR_VERSION); kh.hash_slice(remote_version); @@ -288,29 +303,90 @@ impl Algos { } } +impl KexCookie { + pub fn generate() -> Result { + let mut c = KexCookie([0; _]); + random::fill_random(&mut c.0)?; + Ok(c) + } +} + impl Kex { pub fn new() -> Self { Kex::Idle } + /// Start a kexinit. Must be called from Idle state. + pub fn start_kexinit(&mut self, s: &mut TrafSend) { + debug_assert!(matches!(self, Kex::Idle)); + s.set_drain_output(true); + *self = Kex::StartKexInit + } + fn take(&mut self) -> Self { debug_assert!(!matches!(self, Kex::Taken)); core::mem::replace(self, Kex::Taken) } - /// Sends a `KexInit` message. Must be called from `Idle` state - pub fn send_kexinit( - &mut self, - conf: &AlgoConfig, - s: &mut TrafSend, - ) -> Result<()> { - if !matches!(self, Kex::Idle) { - return Err(Error::bug()); + /// Test if a KEX is in progress. + /// + /// This is from a sending perspective. Returns true after KexInit + /// has been sent and before NewKeys has been sent. + /// During that interval non-KEX packets are disallowed. + pub fn is_sending(&self) -> bool { + matches!(self, Kex::KexInit { .. } | Kex::KexDH { .. }) + } + + /// Test if a KEX is in progress from a receiving perspective. + /// + /// true after KexInit has been received and before NewKeys has been received. + pub fn is_receiving(&self) -> bool { + matches!( + self, + Kex::KexDH { .. } | Kex::StartKexDH { .. } | Kex::NewKeys { .. } + ) + } + + pub fn progress(&mut self, conf: &AlgoConfig, s: &mut TrafSend) -> Result<()> { + // Send KexInit if TrafOut has drained. + // TODO: It isn't necessary for TrafOut to drain entirely, it would be OK + // to instead check that it has adequate space for all the packets + // that would be sent in a KEX sequence. + trace!("{self:?}"); + match self { + Kex::Idle => { + if s.is_rekey_needed() { + self.start_kexinit(s); + } + } + Kex::StartKexInit => { + if s.is_drained() { + s.set_drain_output(false); + let our_cookie = KexCookie::generate()?; + s.send(make_kexinit(&our_cookie, conf))?; + *self = Kex::KexInit { our_cookie }; + } + } + Kex::StartKexDH { .. } => { + if s.is_drained() { + s.set_drain_output(false); + let Kex::StartKexDH { our_cookie, mut algos, kex_hash } = + self.take() + else { + unreachable!(); + }; + s.send(make_kexinit(&our_cookie, conf))?; + if CS::is_client() { + algos.kex.send_kexdhinit(s)?; + } + *self = Kex::KexDH { algos, kex_hash }; + } + } + Kex::KexInit { .. } | Kex::KexDH { .. } | Kex::NewKeys { .. } => { + // waiting for incoming packets, no progress + } + Kex::Taken => return Err(Error::bug()), } - let mut our_cookie = KexCookie([0u8; 16]); - random::fill_random(our_cookie.0.as_mut_slice())?; - s.send(Self::make_kexinit(&our_cookie, conf))?; - *self = Kex::KexInit { our_cookie }; Ok(()) } @@ -322,18 +398,18 @@ impl Kex { first_kex: bool, s: &mut TrafSend, ) -> Result<()> { - // Reply if we haven't already received one. This will bump the state to Kex::KexInit - if let Kex::Idle = self { - self.send_kexinit(algo_conf, s)?; - } - - let our_cookie = if let Kex::KexInit { ref our_cookie } = self { - our_cookie - } else { - // already received a KexInit - return error::PacketWrong.fail(); + let our_cookie = match self { + Kex::KexInit { our_cookie } => our_cookie.clone(), + Kex::Idle | Kex::StartKexInit => KexCookie::generate()?, + _ => { + // already received a KexInit + return error::PacketWrong.fail(); + } }; + // start is set when a KexInit hasn't yet been sent. + let start = matches!(self, Kex::Idle | Kex::StartKexInit); + let mut algos = Self::algo_negotiation(&remote_kexinit, algo_conf)?; debug!("{algos}"); @@ -341,9 +417,6 @@ impl Kex { debug!("kexinit has strict kex but wasn't first packet"); return error::PacketWrong.fail(); } - if CS::is_client() { - algos.kex.send_kexdhinit(s)?; - } if first_kex && algos.strict_kex { // strict-kex in initial KEXINIT enables it s.enable_strict_kex(); @@ -351,31 +424,24 @@ impl Kex { let kex_hash = KexHash::new::( algo_conf, - our_cookie, + &our_cookie, remote_version, &remote_kexinit.into(), )?; - *self = Kex::KexDH { algos, kex_hash }; - Ok(()) - } - fn make_kexinit<'a>(cookie: &KexCookie, conf: &'a AlgoConfig) -> Packet<'a> { - packets::KexInit { - cookie: cookie.clone(), - kex: (&conf.kexs).into(), - hostsig: (&conf.hostsig).into(), - cipher_c2s: (&conf.ciphers).into(), - cipher_s2c: (&conf.ciphers).into(), - mac_c2s: (&conf.macs).into(), - mac_s2c: (&conf.macs).into(), - comp_c2s: (&conf.comps).into(), - comp_s2c: (&conf.comps).into(), - lang_c2s: NameList::empty(), - lang_s2c: NameList::empty(), - first_follows: false, - reserved: 0, - } - .into() + *self = if start { + if matches!(self, Kex::Idle) { + s.set_drain_output(true); + } + // client kexdhinit will be sent after KexInit is sent. + Kex::StartKexDH { our_cookie, algos, kex_hash } + } else { + if CS::is_client() { + algos.kex.send_kexdhinit(s)?; + } + Kex::KexDH { algos, kex_hash } + }; + Ok(()) } pub fn handle_newkeys( @@ -514,11 +580,12 @@ impl Kex { } pub fn is_strict(&self) -> bool { - matches!( - self, - Kex::KexDH { algos: Algos { strict_kex: true, .. }, .. } - | Kex::NewKeys { algos: Algos { strict_kex: true, .. }, .. } - ) + match self { + Kex::StartKexDH { algos, .. } + | Kex::KexDH { algos, .. } + | Kex::NewKeys { algos, .. } => algos.strict_kex, + _ => false, + } } pub fn handle_kexdhreply(&self) -> Result { @@ -526,6 +593,7 @@ impl Kex { trace!("kexdhreply not client"); return error::SSHProto.fail(); } + if !matches!(self, Kex::KexDH { .. }) { return error::PacketWrong.fail(); } @@ -778,6 +846,25 @@ impl SharedSecret { } } +pub fn make_kexinit<'a>(cookie: &KexCookie, conf: &'a AlgoConfig) -> Packet<'a> { + packets::KexInit { + cookie: cookie.clone(), + kex: (&conf.kexs).into(), + hostsig: (&conf.hostsig).into(), + cipher_c2s: (&conf.ciphers).into(), + cipher_s2c: (&conf.ciphers).into(), + mac_c2s: (&conf.macs).into(), + mac_s2c: (&conf.macs).into(), + comp_c2s: (&conf.comps).into(), + comp_s2c: (&conf.comps).into(), + lang_c2s: NameList::empty(), + lang_s2c: NameList::empty(), + first_follows: false, + reserved: 0, + } + .into() +} + // TODO ZeroizeOnDrop. Sha256 doesn't support it yet. // https://github.com/RustCrypto/hashes/issues/87 pub(crate) struct KexOutput { @@ -1213,8 +1300,12 @@ mod tests { let mut cli = kex::Kex::new(); let mut serv = kex::Kex::new(); - serv.send_kexinit(&serv_conf, &mut ts.sender()).unwrap(); - cli.send_kexinit(&cli_conf, &mut tc.sender()).unwrap(); + serv.start_kexinit(&mut ts.sender()); + serv.progress(&serv_conf, &mut ts.sender()).unwrap(); + cli.start_kexinit(&mut tc.sender()); + cli.progress(&cli_conf, &mut tc.sender()).unwrap(); + assert!(matches!(serv, Kex::KexInit { .. })); + assert!(matches!(cli, Kex::KexInit { .. })); let cli_init = tc.next().unwrap(); let cli_init = if let Packet::KexInit(k) = cli_init { k } else { panic!() }; diff --git a/src/packets.rs b/src/packets.rs index 4b3cb53..b975bce 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -81,15 +81,15 @@ pub struct DebugPacket<'a> { pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct Disconnect<'a> { pub reason: u32, pub desc: TextString<'a>, - pub lang: TextString<'a>, + pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct Unimplemented { pub seq: u32, @@ -115,7 +115,7 @@ pub struct ServiceRequest<'a> { pub name: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ServiceAccept<'a> { pub name: &'a str, @@ -312,7 +312,7 @@ pub struct UserauthFailure<'a> { pub partial: bool, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct UserauthSuccess {} @@ -672,7 +672,7 @@ pub enum GlobalRequestMethod<'a> { // pub port: u32, // } -#[derive(Debug, SSHEncode)] +#[derive(Debug, SSHEncode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[sshwire(no_variant_names)] pub enum RequestSuccess { @@ -699,7 +699,7 @@ impl<'de> SSHDecode<'de> for RequestSuccess { // pub port: u32, // } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct RequestFailure {} @@ -731,7 +731,7 @@ pub enum ChannelOpenType<'a> { Unknown(Unknown<'a>), } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelOpenConfirmation { pub num: u32, @@ -740,7 +740,7 @@ pub struct ChannelOpenConfirmation { pub max_packet: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelOpenFailure<'a> { pub num: u32, @@ -749,7 +749,7 @@ pub struct ChannelOpenFailure<'a> { pub lang: &'a str, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelWindowAdjust { pub num: u32, @@ -781,25 +781,25 @@ impl ChannelDataExt<'_> { pub const DATA_OFFSET: usize = 13; } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelEof { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelClose { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelSuccess { pub num: u32, } -#[derive(Debug, SSHEncode, SSHDecode)] +#[derive(Debug, SSHEncode, SSHDecode, Clone)] #[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct ChannelFailure { pub num: u32, diff --git a/src/runner.rs b/src/runner.rs index 8e9360d..7ae397e 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -229,13 +229,19 @@ impl<'a> Runner<'a, server::Server> { )); let mut s = self.traf_out.sender(&mut self.keys); - let ev = self.conn.resume_servauth(allow, &mut s)?; - self.set_extra_resume(ev); + let ev = self.conn.resume_servauth(allow, &mut s); + let r = match ev { + Ok(ev) => { + self.set_extra_resume(ev); + Ok(()) + } + Err(e) => Err(e), + }; // auth packets have passwords self.traf_in.zeroize_payload(); self.resume_nocheck(); - Ok(()) + r } pub(crate) fn resume_servauth_pkok(&mut self) -> Result<()> { @@ -317,6 +323,21 @@ impl<'a, CS: CliServ> Runner<'a, CS> { self.traf_in.done_payload(); } + // Try moving packets from the deferred queue to the + // normal output. Can't happen during kex since non-kex + // packets are disallowed. + // KEX packets aren't included in the deferred list. + if !self.conn.is_kex_sending() { + match self.traf_out.send_deferred_packets(&mut self.keys) { + Ok(()) => (), + Err(Error::NoRoom { .. }) => { + // try again once there's space + return Ok(Event::None); + } + Err(e) => return Err(e), + } + } + let mut disp = Dispatched::default(); // Handle incoming packets @@ -405,8 +426,20 @@ impl<'a, CS: CliServ> Runner<'a, CS> { // Whether [`input()`](input) is ready pub fn is_input_ready(&self) -> bool { - (self.conn.initial_sent() && self.traf_in.is_input_ready()) - || self.closed_input + if self.closed_input { + return true; + } + + // During KEX, let the output queue drain before accepting packets, + // so we can be sure not to hit NoRoom. + // kexdhinit/kexdhreply packets can be large. + // The deferred queue may still have packets since they + // can't be send during a KEX. + if self.conn.is_kex_sending() && self.traf_out.is_output_pending() { + return false; + } + + self.conn.initial_sent() && self.traf_in.is_input_ready() } /// Set a waker to be notified when [`input()`](Self::input) is ready to be called. @@ -602,12 +635,31 @@ impl<'a, CS: CliServ> Runner<'a, CS> { // Avoid apps polling forever on a packet type that won't come dt.validate_send(CS::is_client())?; - if !self.conn.kex_is_idle() { + // When write_channel_ready() returns Some(0), a subsequent + // channel_wake_write() needs to occur. + + // channel_wake_write() after KexDone. + if self.conn.is_kex_sending() { // Only KEX messages are allowed during key exchange, // not data. return Ok(Some(0)); } + // Drain only happens prior to KEX, so can be woken after KexDone. + if self.traf_out.is_draining() { + // Continual channel data could prevent drain from completing, + // so disallow channel writes when draining. + return Ok(Some(0)); + } + + // write_channel_ready() will happen after the deferred packets are + // moved to main queued then written. + if self.traf_out.have_deferred_packets() { + // Let deferred packets get moved to the output queue + // before sending more channel data. + return Ok(Some(0)); + } + // minimum of buffer space and channel window available let payload_space = self.traf_out.send_allowed(&self.keys); // subtract space for packet headers prior to data diff --git a/src/server.rs b/src/server.rs index 7018dde..24c2574 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,6 +22,7 @@ impl Server { s: &mut TrafSend, ) -> Result<()> { let success = match p.name { + // Any matched names here must also be in static_service_name() SSH_SERVICE_USERAUTH => true, SSH_SERVICE_CONNECTION => self.auth.is_authed(), _ => false, diff --git a/src/sshwire.rs b/src/sshwire.rs index f9a08d1..f0ced13 100644 --- a/src/sshwire.rs +++ b/src/sshwire.rs @@ -323,6 +323,10 @@ impl SSHEncode for heapless::String { pub struct TextString<'a>(pub &'a [u8]); impl<'a> TextString<'a> { + pub fn new() -> TextString<'static> { + TextString(&[]) + } + /// Returns the UTF-8 decoded string, using [`core::str::from_utf8`] /// /// Don't call this if you are avoiding including UTF-8 routines in diff --git a/src/traffic.rs b/src/traffic.rs index 1b576d7..37d00e3 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -11,11 +11,20 @@ use zeroize::{Zeroize, ZeroizeOnDrop}; #[cfg(feature = "alloc")] use alloc::boxed::Box; +use heapless::Deque; -use crate::channel::{ChanData, ChanNum}; use crate::encrypt::{KeyState, KeysRecv, KeysSend, SSH_PAYLOAD_START}; use crate::ident::RemoteVersion; use crate::*; +use crate::{ + channel::{ChanData, ChanNum}, + packets::Packet, +}; + +/// Number of `DeferredPacket`s to queue. +/// +/// Each entry takes around 40 bytes. +const DEFER_COUNT: usize = 10; // Either a slice or boxed array. // Similar to managed::ManagedSlice. @@ -356,6 +365,13 @@ pub(crate) struct TrafOut<'a> { /// in-place as they are written to `buf`. buf: SliceOrVec<'a>, state: TxState, + + drain: bool, + + // Set between sending KexInit and sending NewKeys. + sending_kex: bool, + + deferred_packets: Deque, } /// State machine for writes @@ -395,18 +411,75 @@ impl TrafIn<'static> { impl<'a> TrafOut<'a> { pub fn new(buf: &'a mut [u8]) -> Self { - Self { buf: SliceOrVec::Borrowed(buf), state: TxState::Idle } + Self { + buf: SliceOrVec::Borrowed(buf), + state: TxState::Idle, + drain: false, + sending_kex: false, + deferred_packets: Deque::new(), + } } /// Serializes and and encrypts a packet to send + /// + /// If the output buffer is full or a rekey is in progress, the + /// packet will be enqueued to be sent later. If the deferred packet queue + /// is full, `NoRoom` will be returned. + /// + /// `BusySend` error is recoverable, others should be treated as fatal. pub(crate) fn send_packet( &mut self, p: packets::Packet, keys: &mut KeyState, ) -> Result<()> { - // Sanity check + let is_kex = matches!(p.category(), packets::Category::Kex); + + if is_kex || (self.deferred_packets.is_empty() && !self.sending_kex) { + // Send the packet normally if it fits. + // KEX packets can be sent even if other packets are deferred + // (KEX packets don't get deferred themselves). + // A KexInit is only sent when there are no deferred packets, + // so we don't need to worry about incorrect reordering. + match self.send_packet_inner(&p, keys) { + Err(Error::NoRoom { .. }) => { + debug_assert!(!is_kex, "KEX packets should have room"); + // non-kex packets get deferred + } + res => return res, + } + } + + // Either it didn't fit (NoRoom), or the deferred queue + // is already in use so we need to enqueue after that. + // Attempt to defer the packet. + + let pnum = p.message_num(); + trace!("Delay packet type {pnum:?}"); + + // Convert to a DeferredPacket if possible + let Ok(dp) = DeferredPacket::try_from(p) else { + // Packet type isn't expected to be deferred. + trace!("NoRoom packet type {pnum:?}"); + return error::BusySend { packet: pnum, unsupported: true }.fail(); + }; + + self.deferred_packets.push_front(dp).map_err(|_| { + error!("No space to queue packet"); + trace!("NoRoom packet type {pnum:?}"); + error::BusySend { packet: pnum, unsupported: false }.build() + }) + } + + // Check some invariants, and track whether we're sending KEX. + fn track_send_packet( + &mut self, + p: &packets::Packet, + keys: &mut KeyState, + ) -> Result<()> { + // Check that packets are being encrypted + // This is checked in release and debug. match p.category() { - packets::Category::All | packets::Category::Kex => (), // OK cleartext + packets::Category::All | packets::Category::Kex => (), _ => { if keys.is_send_cleartext() { return Error::bug_msg("send cleartext"); @@ -414,6 +487,45 @@ impl<'a> TrafOut<'a> { } } + // KEX send packet catetory checked in debug builds for fuzzing. + if self.sending_kex { + // strict kex is ignored since we don't have access to + // conn.kex. + debug_assert!(matches!( + p.category(), + packets::Category::All | packets::Category::Kex + )); + } + + // Track KEX sending state + match p { + Packet::KexInit(_) => { + debug_assert!(!self.sending_kex); + self.sending_kex = true; + } + Packet::NewKeys(_) => { + debug_assert!(self.sending_kex); + self.sending_kex = false; + } + _ => (), + } + + Ok(()) + } + + /// Serializes and and encrypts a packet to send + /// + /// The packet will not be enqueued to the deferred queue. + /// This function should not usually be called directly. + /// + /// `NoRoom` error is recoverable, others should be treated as fatal. + pub fn send_packet_inner( + &mut self, + p: &packets::Packet, + keys: &mut KeyState, + ) -> Result<()> { + self.track_send_packet(p, keys)?; + // Either a fresh buffer or appending to write let (idx, len) = match self.state { TxState::Idle => (0, 0), @@ -439,20 +551,41 @@ impl<'a> TrafOut<'a> { Ok(()) } + pub fn send_deferred_packets(&mut self, keys: &mut KeyState) -> Result<()> { + while let Some(d) = self.deferred_packets.back() { + let p = Packet::from(d); + match self.send_packet_inner(&p, keys) { + Ok(()) => { + self.deferred_packets.pop_back(); + } + Err(Error::NoRoom { .. }) => { + // Can't progress, let the caller retry later + break; + } + Err(e) => return Err(e), + } + } + + Ok(()) + } + + pub fn have_deferred_packets(&self) -> bool { + !self.deferred_packets.is_empty() + } + pub fn is_output_pending(&self) -> bool { trace!("is_output_pending st {:?}", self.state); matches!(self.state, TxState::Write { .. }) } - /// A simple test if a packet can be sent. `send_allowed` should be used - /// for more general situations - pub fn can_output(&self) -> bool { - // TODO don't use this - true - } - /// Returns payload space available to send a packet. Returns 0 if not ready or full pub fn send_allowed(&self, keys: &KeyState) -> usize { + if !self.deferred_packets.is_empty() { + // Don't allow sending packets when deferred ones are waiting. + // Otherwise the deferred queue will run out of room. + return 0; + } + // TODO: test for full output buffer match self.state { TxState::Write { len, .. } => keys.max_enc_payload(self.buf.len() - len), @@ -508,6 +641,16 @@ impl<'a> TrafOut<'a> { pub fn sender<'s>(&'s mut self, keys: &'s mut KeyState) -> TrafSend<'s, 'a> { TrafSend::new(self, keys) } + + /// Return whether output is draining. + /// + /// Used to determine whether to initiate outbound traffic, such as channel writes. + /// Generally immediate responses to incoming messages should still be sent + /// even when draining. Otherwise they would need to be put in deferred_packets + /// which may run out. + pub fn is_draining(&self) -> bool { + self.drain + } } #[cfg(feature = "alloc")] @@ -546,10 +689,6 @@ impl<'s, 'a> TrafSend<'s, 'a> { self.out.send_version() } - pub fn can_output(&self) -> bool { - self.out.can_output() - } - /// Returns the current receive sequence number pub fn recv_seq(&self) -> u32 { self.keys.seq_decrypt.0 @@ -558,4 +697,108 @@ impl<'s, 'a> TrafSend<'s, 'a> { pub fn enable_strict_kex(&mut self) { self.keys.enable_strict_kex(); } + + pub fn is_rekey_needed(&self) -> bool { + self.keys.is_rekey_needed() + } + + /// Set TrafOut to start draining output. + /// + /// Only one caller/area should be using set_drain_output() at a time. + /// For `TrafOut` itself there isn't a problem with multiple + /// callers enabling/disabling drain, but it could result in races + /// between callers. Only kex should be using it currently, so + /// there is a debug_assert! to that effect. + pub fn set_drain_output(&mut self, drain: bool) { + debug_assert!(drain != self.out.drain, "set_drain_output() dupe"); + self.out.drain = drain; + } + + /// Test if output buffer is empty. + /// + /// Fails if `set_drain_output(true)` wasn't set (debug panic) + /// This isn't inherent, but helps catch misuse (see comment + /// for set_drain_output()). + pub fn is_drained(&self) -> bool { + debug_assert!(self.out.drain); + matches!(self.out.state, TxState::Idle) + && self.out.deferred_packets.is_empty() + } +} + +/// Packet types that may be sent once a currently-NoRoom TrafOut clears. +/// +/// Rather than storing an entire `Packet<'static>`, keep a queue of smaller +/// `DeferredPacket`s. +/// +/// These packet types account for most packets that may be sent as responses +/// or from other asynchronous events (rekey packet count reached, as an example). +/// +/// These also queue packets to be sent while a KEX is in progress +/// (other packet types aren't allowed) +/// +/// Some packet types aren't included here since they're deferred via other +/// mechanisms: +/// +/// - ChannelWindowAdjust. Can be retried later. +/// - KEX packets. The output buffer is drained at the start of a KEX. +/// - Userauth - we hope it only occurs early when traffic is +/// well defined (no channels) and no KEXes are happening. +#[derive(Debug)] +pub enum DeferredPacket { + ChannelSuccess(packets::ChannelSuccess), + ChannelFailure(packets::ChannelFailure), + ChannelOpenFailure(packets::ChannelOpenFailure<'static>), + ChannelOpenConfirmation(packets::ChannelOpenConfirmation), + ChannelEof(packets::ChannelEof), + ChannelClose(packets::ChannelClose), + Unimplemented(packets::Unimplemented), + RequestFailure(packets::RequestFailure), + RequestSuccess(packets::RequestSuccess), +} + +impl DeferredPacket {} + +impl From<&DeferredPacket> for Packet<'static> { + fn from(d: &DeferredPacket) -> Self { + match d { + DeferredPacket::ChannelSuccess(p) => (p.clone()).into(), + DeferredPacket::ChannelFailure(p) => (p.clone()).into(), + DeferredPacket::ChannelOpenFailure(p) => (p.clone()).into(), + DeferredPacket::ChannelOpenConfirmation(p) => (p.clone()).into(), + DeferredPacket::ChannelEof(p) => (p.clone()).into(), + DeferredPacket::ChannelClose(p) => (p.clone()).into(), + DeferredPacket::Unimplemented(p) => (p.clone()).into(), + DeferredPacket::RequestFailure(p) => (p.clone()).into(), + DeferredPacket::RequestSuccess(p) => (p.clone()).into(), + } + } +} + +impl<'a> TryFrom> for DeferredPacket { + type Error = Error; + fn try_from(packet: Packet<'a>) -> Result { + Ok(match packet { + Packet::ChannelSuccess(p) => Self::ChannelSuccess(p), + Packet::ChannelFailure(p) => Self::ChannelFailure(p), + Packet::ChannelOpenConfirmation(p) => Self::ChannelOpenConfirmation(p), + Packet::ChannelEof(p) => Self::ChannelEof(p), + Packet::ChannelClose(p) => Self::ChannelClose(p), + Packet::Unimplemented(p) => Self::Unimplemented(p), + Packet::RequestFailure(p) => Self::RequestFailure(p), + Packet::RequestSuccess(p) => Self::RequestSuccess(p), + + Packet::ChannelOpenFailure(p) => { + Self::ChannelOpenFailure(packets::ChannelOpenFailure { + // empty desc for 'static + desc: TextString::new(), + lang: "", + ..p + }) + } + + // Unhandled types + _ => return error::SSHProtoUnsupported.fail(), + }) + } }