From 674206c0cfebe4d2476e5af8a3a7f73cb6a85410 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juhani=20Krekel=C3=A4?= Date: Mon, 9 Mar 2026 00:38:31 +0200 Subject: [PATCH] Use MSG_EOR to mark fragment boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ipc-channel relies on the fragments being read as atomic chunks, which is not guaranteed to be true for plain recvmsg(2) on SOCK_SEQPACKET cross-platform. Namely, on FreeBSD it's possible for this to return both more and less than a single fragment. SOCK_SEQPACKET can be thought of as a variant of SOCK_STREAM with the ability to set "end of record" markers using MSG_EOR. A single recvmsg(2) will not read past such a marker. While the fact that the marker has been reached is supposed to be communicated back to the caller through msg_flags, as of now Linux does not properly implement this on Unix-domain sockets. To handle cases where the recvmsg(2) returns less than a single fragment, include the size of the fragment in the header and continue reads until the entire fragment has been read. Since SOCK_SEQPACKET maintains ordering, as long as sending the fragments is atomic, this will not will not run the danger of interleaving of different messages. Empirically, sendmsg(2) as it is used by ipc-channel is atomic at least on FreeBSD. To make any possible resultant bugs easier to find, add asserts for that. Signed-off-by: Juhani Krekelä --- src/platform/unix/mod.rs | 123 +++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 36 deletions(-) diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index b071812b..66ccfdfe 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -9,8 +9,8 @@ use crate::ipc::IpcMessage; use libc::{ - self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ, - PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, + self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, MSG_EOR, + PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, }; use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK}; use libc::{iovec, msghdr, off_t, recvmsg, sendmsg}; @@ -244,7 +244,7 @@ impl OsIpcSender { /// /// This one is smaller than regular fragments, because it carries the message (size) header. fn first_fragment_size(sendbuf_size: usize) -> usize { - (Self::fragment_size(sendbuf_size) - mem::size_of::()) & (!8usize + 1) + (Self::fragment_size(sendbuf_size) - 2 * mem::size_of::()) & (!8usize + 1) // Ensure optimal alignment. } @@ -287,6 +287,12 @@ impl OsIpcSender { data_buffer: &[u8], len: usize, ) -> Result<(), UnixError> { + assert!(fds.len() <= MAX_FDS_IN_CMSG as usize); + + let mut fragment_size: usize = 0; + fragment_size = + mem::size_of_val(&fragment_size) + mem::size_of_val(&len) + data_buffer.len(); + let result = unsafe { let cmsg_length = mem::size_of_val(fds) as c_uint; let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 { @@ -310,11 +316,16 @@ impl OsIpcSender { }; let mut iovec = [ - // First fragment begins with a header recording the total data length. + // First fragment begins with a header recording the fragment size + // (including the header) and the total data length. // - // The receiver uses this to determine - // whether it already got the entire message, + // The receiver uses this to determine if the fragment has been fully + // read, and afterwards whether it already got the entire message, // or needs to receive additional fragments -- and if so, how much. + iovec { + iov_base: &fragment_size as *const _ as *mut c_void, + iov_len: mem::size_of_val(&fragment_size), + }, iovec { iov_base: &len as *const _ as *mut c_void, iov_len: mem::size_of_val(&len), @@ -326,12 +337,13 @@ impl OsIpcSender { ]; let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen); - let result = sendmsg(sender_fd, &msghdr, 0); + let result = sendmsg(sender_fd, &msghdr, MSG_EOR); libc::free(cmsg_buffer as *mut c_void); result }; if result > 0 { + assert_eq!(fragment_size, result.try_into().unwrap()); Ok(()) } else { Err(UnixError::last()) @@ -349,6 +361,7 @@ impl OsIpcSender { }; if result > 0 { + assert_eq!(data_buffer.len(), result.try_into().unwrap()); Ok(()) } else { Err(UnixError::last()) @@ -1048,39 +1061,77 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size()); main_data_buffer.set_len(OsIpcSender::get_max_fragment_size()); - let mut iovec = [ - iovec { - iov_base: &mut total_size as *mut _ as *mut c_void, - iov_len: mem::size_of_val(&total_size), - }, - iovec { - iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, - iov_len: main_data_buffer.len(), - }, - ]; - let mut cmsg = UnixCmsg::new(&mut iovec)?; + let mut fragment_size: usize = 0; - let bytes_read = cmsg.recv(fd, blocking_mode)?; - main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size)); + let mut bytes_read: usize = 0; - let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; - let cmsg_length = cmsg.msghdr.msg_controllen; - let channel_length = if cmsg_length == 0 { - 0 - } else { - // The control header is followed by an array of FDs. The size of the control header is - // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't - // exposed by libc. CMSG_SPACE(0) is the portable version of that.) - (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::() - }; - for index in 0..channel_length { - let fd = *cmsg_fds.add(index); - if is_socket(fd) { - channels.push(OsOpaqueIpcChannel::from_fd(fd)); - continue; + let mut blocking_mode = blocking_mode; + + loop { + let mut iovec = [ + iovec { + iov_base: &mut fragment_size as *mut _ as *mut c_void, + iov_len: mem::size_of_val(&fragment_size), + }, + iovec { + iov_base: &mut total_size as *mut _ as *mut c_void, + iov_len: mem::size_of_val(&total_size), + }, + iovec { + iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, + iov_len: main_data_buffer.len(), + }, + ]; + + let mut iovec: &mut [iovec] = &mut iovec; + + let mut offset = bytes_read; + while offset > 0 { + let change = offset.min(iovec[0].iov_len); + iovec[0].iov_base = iovec[0].iov_base.offset(change.try_into().unwrap()); + iovec[0].iov_len -= change; + offset -= change; + + if iovec[0].iov_len == 0 { + iovec = &mut iovec[1..]; + } } - shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + + let mut cmsg = UnixCmsg::new(iovec)?; + + bytes_read += cmsg.recv(fd, blocking_mode).inspect_err(|_| { + assert_eq!(bytes_read, 0); + })?; + + let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; + let cmsg_length = cmsg.msghdr.msg_controllen; + let channel_length = if cmsg_length == 0 { + 0 + } else { + // The control header is followed by an array of FDs. The size of the control header is + // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't + // exposed by libc. CMSG_SPACE(0) is the portable version of that.) + (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::() + }; + for index in 0..channel_length { + let fd = *cmsg_fds.add(index); + if is_socket(fd) { + channels.push(OsOpaqueIpcChannel::from_fd(fd)); + continue; + } + shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + } + + if bytes_read >= mem::size_of_val(&fragment_size) && bytes_read == fragment_size { + break; + } + + // Block on subsequent reads, to avoid returning mid-fragment + blocking_mode = BlockingMode::Blocking; } + + main_data_buffer + .set_len(bytes_read - mem::size_of_val(&fragment_size) - mem::size_of_val(&total_size)); } if total_size == main_data_buffer.len() {