Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 87 additions & 36 deletions src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

use crate::ipc::IpcMessage;
use libc::{
self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, MSG_EOR,
PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
};
use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
Expand Down Expand Up @@ -244,7 +244,7 @@ impl OsIpcSender {
///
/// This one is smaller than regular fragments, because it carries the message (size) header.
fn first_fragment_size(sendbuf_size: usize) -> usize {
(Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
(Self::fragment_size(sendbuf_size) - 2 * mem::size_of::<usize>()) & (!8usize + 1)
// Ensure optimal alignment.
}

Expand Down Expand Up @@ -287,6 +287,12 @@ impl OsIpcSender {
data_buffer: &[u8],
len: usize,
) -> Result<(), UnixError> {
assert!(fds.len() <= MAX_FDS_IN_CMSG as usize);

let mut fragment_size: usize = 0;
fragment_size =
mem::size_of_val(&fragment_size) + mem::size_of_val(&len) + data_buffer.len();

let result = unsafe {
let cmsg_length = mem::size_of_val(fds) as c_uint;
let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
Expand All @@ -310,11 +316,16 @@ impl OsIpcSender {
};

let mut iovec = [
// First fragment begins with a header recording the total data length.
// First fragment begins with a header recording the fragment size
// (including the header) and the total data length.
//
// The receiver uses this to determine
// whether it already got the entire message,
// The receiver uses this to determine if the fragment has been fully
// read, and afterwards whether it already got the entire message,
// or needs to receive additional fragments -- and if so, how much.
iovec {
iov_base: &fragment_size as *const _ as *mut c_void,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to use &mut here and skip the intermediate const pointer cast, but we can do that for both iovecs in a followup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean &raw mut fragment_size as *mut c_void? I can't seem to get the compiler to be happy with a cast directly from a reference to a void pointer.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we probably still need a *mut _ cast in between, but the status quo of casting an immutable reference to a mutable pointer is obviously wrong.

iov_len: mem::size_of_val(&fragment_size),
},
iovec {
iov_base: &len as *const _ as *mut c_void,
iov_len: mem::size_of_val(&len),
Expand All @@ -326,12 +337,13 @@ impl OsIpcSender {
];

let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
let result = sendmsg(sender_fd, &msghdr, 0);
let result = sendmsg(sender_fd, &msghdr, MSG_EOR);
libc::free(cmsg_buffer as *mut c_void);
result
};

if result > 0 {
assert_eq!(fragment_size, result.try_into().unwrap());
Ok(())
} else {
Err(UnixError::last())
Expand All @@ -349,6 +361,7 @@ impl OsIpcSender {
};

if result > 0 {
assert_eq!(data_buffer.len(), result.try_into().unwrap());
Ok(())
} else {
Err(UnixError::last())
Expand Down Expand Up @@ -1048,39 +1061,77 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError>
main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());

let mut iovec = [
iovec {
iov_base: &mut total_size as *mut _ as *mut c_void,
iov_len: mem::size_of_val(&total_size),
},
iovec {
iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
iov_len: main_data_buffer.len(),
},
];
let mut cmsg = UnixCmsg::new(&mut iovec)?;
let mut fragment_size: usize = 0;

let bytes_read = cmsg.recv(fd, blocking_mode)?;
main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
let mut bytes_read: usize = 0;

let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
let cmsg_length = cmsg.msghdr.msg_controllen;
let channel_length = if cmsg_length == 0 {
0
} else {
// The control header is followed by an array of FDs. The size of the control header is
// determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
// exposed by libc. CMSG_SPACE(0) is the portable version of that.)
(cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
};
for index in 0..channel_length {
let fd = *cmsg_fds.add(index);
if is_socket(fd) {
channels.push(OsOpaqueIpcChannel::from_fd(fd));
continue;
let mut blocking_mode = blocking_mode;

loop {
let mut iovec = [
iovec {
iov_base: &mut fragment_size as *mut _ as *mut c_void,
iov_len: mem::size_of_val(&fragment_size),
},
iovec {
iov_base: &mut total_size as *mut _ as *mut c_void,
iov_len: mem::size_of_val(&total_size),
},
iovec {
iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
iov_len: main_data_buffer.len(),
},
];

let mut iovec: &mut [iovec] = &mut iovec;

let mut offset = bytes_read;
while offset > 0 {
let change = offset.min(iovec[0].iov_len);
iovec[0].iov_base = iovec[0].iov_base.offset(change.try_into().unwrap());
iovec[0].iov_len -= change;
offset -= change;

if iovec[0].iov_len == 0 {
iovec = &mut iovec[1..];
}
}
shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));

let mut cmsg = UnixCmsg::new(iovec)?;

bytes_read += cmsg.recv(fd, blocking_mode).inspect_err(|_| {
assert_eq!(bytes_read, 0);
})?;

let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
let cmsg_length = cmsg.msghdr.msg_controllen;
let channel_length = if cmsg_length == 0 {
0
} else {
// The control header is followed by an array of FDs. The size of the control header is
// determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
// exposed by libc. CMSG_SPACE(0) is the portable version of that.)
(cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
};
for index in 0..channel_length {
let fd = *cmsg_fds.add(index);
if is_socket(fd) {
channels.push(OsOpaqueIpcChannel::from_fd(fd));
continue;
}
shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
}

if bytes_read >= mem::size_of_val(&fragment_size) && bytes_read == fragment_size {
break;
}

// Block on subsequent reads, to avoid returning mid-fragment
blocking_mode = BlockingMode::Blocking;
}

main_data_buffer
.set_len(bytes_read - mem::size_of_val(&fragment_size) - mem::size_of_val(&total_size));
}

if total_size == main_data_buffer.len() {
Expand Down
Loading