diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index b071812b..66ccfdfe 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -9,8 +9,8 @@ use crate::ipc::IpcMessage; use libc::{ - self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ, - PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, + self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, MSG_EOR, + PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, }; use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK}; use libc::{iovec, msghdr, off_t, recvmsg, sendmsg}; @@ -244,7 +244,7 @@ impl OsIpcSender { /// /// This one is smaller than regular fragments, because it carries the message (size) header. fn first_fragment_size(sendbuf_size: usize) -> usize { - (Self::fragment_size(sendbuf_size) - mem::size_of::()) & (!8usize + 1) + (Self::fragment_size(sendbuf_size) - 2 * mem::size_of::()) & (!8usize + 1) // Ensure optimal alignment. } @@ -287,6 +287,12 @@ impl OsIpcSender { data_buffer: &[u8], len: usize, ) -> Result<(), UnixError> { + assert!(fds.len() <= MAX_FDS_IN_CMSG as usize); + + let mut fragment_size: usize = 0; + fragment_size = + mem::size_of_val(&fragment_size) + mem::size_of_val(&len) + data_buffer.len(); + let result = unsafe { let cmsg_length = mem::size_of_val(fds) as c_uint; let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 { @@ -310,11 +316,16 @@ impl OsIpcSender { }; let mut iovec = [ - // First fragment begins with a header recording the total data length. + // First fragment begins with a header recording the fragment size + // (including the header) and the total data length. // - // The receiver uses this to determine - // whether it already got the entire message, + // The receiver uses this to determine if the fragment has been fully + // read, and afterwards whether it already got the entire message, // or needs to receive additional fragments -- and if so, how much. + iovec { + iov_base: &fragment_size as *const _ as *mut c_void, + iov_len: mem::size_of_val(&fragment_size), + }, iovec { iov_base: &len as *const _ as *mut c_void, iov_len: mem::size_of_val(&len), @@ -326,12 +337,13 @@ impl OsIpcSender { ]; let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen); - let result = sendmsg(sender_fd, &msghdr, 0); + let result = sendmsg(sender_fd, &msghdr, MSG_EOR); libc::free(cmsg_buffer as *mut c_void); result }; if result > 0 { + assert_eq!(fragment_size, result.try_into().unwrap()); Ok(()) } else { Err(UnixError::last()) @@ -349,6 +361,7 @@ impl OsIpcSender { }; if result > 0 { + assert_eq!(data_buffer.len(), result.try_into().unwrap()); Ok(()) } else { Err(UnixError::last()) @@ -1048,39 +1061,77 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size()); main_data_buffer.set_len(OsIpcSender::get_max_fragment_size()); - let mut iovec = [ - iovec { - iov_base: &mut total_size as *mut _ as *mut c_void, - iov_len: mem::size_of_val(&total_size), - }, - iovec { - iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, - iov_len: main_data_buffer.len(), - }, - ]; - let mut cmsg = UnixCmsg::new(&mut iovec)?; + let mut fragment_size: usize = 0; - let bytes_read = cmsg.recv(fd, blocking_mode)?; - main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size)); + let mut bytes_read: usize = 0; - let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; - let cmsg_length = cmsg.msghdr.msg_controllen; - let channel_length = if cmsg_length == 0 { - 0 - } else { - // The control header is followed by an array of FDs. The size of the control header is - // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't - // exposed by libc. CMSG_SPACE(0) is the portable version of that.) - (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::() - }; - for index in 0..channel_length { - let fd = *cmsg_fds.add(index); - if is_socket(fd) { - channels.push(OsOpaqueIpcChannel::from_fd(fd)); - continue; + let mut blocking_mode = blocking_mode; + + loop { + let mut iovec = [ + iovec { + iov_base: &mut fragment_size as *mut _ as *mut c_void, + iov_len: mem::size_of_val(&fragment_size), + }, + iovec { + iov_base: &mut total_size as *mut _ as *mut c_void, + iov_len: mem::size_of_val(&total_size), + }, + iovec { + iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, + iov_len: main_data_buffer.len(), + }, + ]; + + let mut iovec: &mut [iovec] = &mut iovec; + + let mut offset = bytes_read; + while offset > 0 { + let change = offset.min(iovec[0].iov_len); + iovec[0].iov_base = iovec[0].iov_base.offset(change.try_into().unwrap()); + iovec[0].iov_len -= change; + offset -= change; + + if iovec[0].iov_len == 0 { + iovec = &mut iovec[1..]; + } } - shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + + let mut cmsg = UnixCmsg::new(iovec)?; + + bytes_read += cmsg.recv(fd, blocking_mode).inspect_err(|_| { + assert_eq!(bytes_read, 0); + })?; + + let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; + let cmsg_length = cmsg.msghdr.msg_controllen; + let channel_length = if cmsg_length == 0 { + 0 + } else { + // The control header is followed by an array of FDs. The size of the control header is + // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't + // exposed by libc. CMSG_SPACE(0) is the portable version of that.) + (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::() + }; + for index in 0..channel_length { + let fd = *cmsg_fds.add(index); + if is_socket(fd) { + channels.push(OsOpaqueIpcChannel::from_fd(fd)); + continue; + } + shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + } + + if bytes_read >= mem::size_of_val(&fragment_size) && bytes_read == fragment_size { + break; + } + + // Block on subsequent reads, to avoid returning mid-fragment + blocking_mode = BlockingMode::Blocking; } + + main_data_buffer + .set_len(bytes_read - mem::size_of_val(&fragment_size) - mem::size_of_val(&total_size)); } if total_size == main_data_buffer.len() {