From 954470b1ffa01fad7bbd8594bb45624da2d4f839 Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Sat, 17 Jan 2026 20:20:14 -0800 Subject: [PATCH 1/6] Problem: ZMQ_STREAM cannot disconnect when SNDHWM is reached Solution: defer HWM check to the second frame in stream_t::xsend --- src/stream.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/stream.cpp b/src/stream.cpp index 7d10b62b86..beacfe6bfc 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -79,9 +79,6 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = out_pipe->pipe; if (!_current_out->check_write ()) { out_pipe->active = false; - _current_out = NULL; - errno = EAGAIN; - return -1; } } else { errno = EHOSTUNREACH; @@ -119,6 +116,11 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = NULL; return 0; } + if (!_current_out->check_write()) { + _more_out = true; + errno = EAGAIN; + return -1; + } const bool ok = _current_out->write (msg_); if (likely (ok)) _current_out->flush (); From 90fbd6137eefda0de7cb69529122f9e7cf388c65 Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Sat, 17 Jan 2026 20:20:14 -0800 Subject: [PATCH 2/6] Relicensing grant: guyco3 I hereby agree to license my contributions to libzmq under the terms of the MPLv2. From 0c9b1ca619a8844487737d8b5a74e925db8ddcaa Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Sat, 17 Jan 2026 20:48:17 -0800 Subject: [PATCH 3/6] Problem: ZMQ_STREAM cannot disconnect when SNDHWM is reached In ZMQ_STREAM, sending the routing ID frame followed by an empty payload signals a disconnect. If the SNDHWM is reached, the routing ID frame is blocked by check_write(), returning EAGAIN and preventing the disconnect signal from being processed. Solution: defer HWM check to the payload frame in stream_t::xsend. Also added a regression test in tests/test_stream_hwm_disconnect.cpp. Fixes #4828 --- Makefile.am | 5 ++ tests/CMakeLists.txt | 1 + tests/test_stream_hwm_disconnect.cpp | 87 ++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 tests/test_stream_hwm_disconnect.cpp diff --git a/Makefile.am b/Makefile.am index 17dc1e6e16..f2effe1f57 100755 --- a/Makefile.am +++ b/Makefile.am @@ -461,6 +461,7 @@ test_apps = \ tests/test_stream \ tests/test_stream_empty \ tests/test_stream_disconnect \ + tests/test_stream_hwm_disconnect \ tests/test_stream_timeout \ tests/test_disconnect_inproc \ tests/test_unbind_wildcard \ @@ -639,6 +640,10 @@ tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_stream_hwm_disconnect_SOURCES = tests/test_stream_hwm_disconnect.cpp +tests_test_stream_hwm_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_stream_hwm_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp tests_test_disconnect_inproc_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_disconnect_inproc_CPPFLAGS = ${TESTUTIL_CPPFLAGS} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 12ac80c175..871c85a1f6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,6 +27,7 @@ set(tests test_stream test_stream_empty test_stream_disconnect + test_stream_hwm_disconnect test_disconnect_inproc test_unbind_wildcard test_ctx_options diff --git a/tests/test_stream_hwm_disconnect.cpp b/tests/test_stream_hwm_disconnect.cpp new file mode 100644 index 0000000000..a8825dd75a --- /dev/null +++ b/tests/test_stream_hwm_disconnect.cpp @@ -0,0 +1,87 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +SETUP_TEARDOWN_TESTCONTEXT + +void test_stream_hwm_disconnect () +{ + void *stream = test_context_socket (ZMQ_STREAM); + char endpoint[MAX_SOCKET_STRING]; + + // Set a low Send High Water Mark to trigger the issue quickly + int sndhwm = 3; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (stream, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm))); + + // Bind the STREAM socket to a loopback address + bind_loopback_ipv4 (stream, endpoint, sizeof (endpoint)); + + // Connect a raw TCP socket to the ZMQ_STREAM socket + fd_t fd = connect_socket (endpoint); + + // STREAM socket receives two frames on connection: + // 1. The routing ID of the new peer + zmq_msg_t routing_id; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); + + // Store routing ID for later use in disconnection + size_t id_size = zmq_msg_size (&routing_id); + void *id_data = zmq_msg_data (&routing_id); + + // 2. An empty frame (connection notification) + TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); + zmq_msg_t empty; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&empty)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&empty, stream, 0)); + TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&empty)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&empty)); + + // Fill the outgoing pipe until it hits the High Water Mark. + // In ZMQ_STREAM, we send [Routing ID][Data]. + while (true) { + // Send Routing ID frame + int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); + if (rc == -1) + break; + + // Send a large data frame to fill the buffer + zmq_msg_t data; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&data, 262144)); + rc = zmq_msg_send (&data, stream, ZMQ_DONTWAIT); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data)); + if (rc == -1) + break; + } + + // Verify that we actually reached the HWM + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // TEST: Attempt to disconnect the client by sending the Routing ID + // followed by a 0-byte payload. + // Before the fix, the first frame (Routing ID) would fail with EAGAIN. + int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); + TEST_ASSERT_EQUAL_INT ((int) id_size, rc); + + // The second frame (0-byte) should trigger the termination logic + rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Cleanup resources + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&routing_id)); + close (fd); // Standard POSIX close as seen in other test files + test_context_socket_close (stream); +} + +int main (int, char **) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_stream_hwm_disconnect); + return UNITY_END (); +} \ No newline at end of file From 13e5c52cf03267bdb7ad0d2e9bf999654ae9e793 Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Sat, 17 Jan 2026 20:48:22 -0800 Subject: [PATCH 4/6] Relicensing grant: guyco3 I hereby agree to license my contributions to libzmq under the terms of the MPLv2. --- src/stream.cpp | 2 +- tests/test_stream_hwm_disconnect.cpp | 32 +++++++++++++++++----------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/stream.cpp b/src/stream.cpp index beacfe6bfc..0d852e2516 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -116,7 +116,7 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = NULL; return 0; } - if (!_current_out->check_write()) { + if (!_current_out->check_write ()) { _more_out = true; errno = EAGAIN; return -1; diff --git a/tests/test_stream_hwm_disconnect.cpp b/tests/test_stream_hwm_disconnect.cpp index a8825dd75a..23e893e86d 100644 --- a/tests/test_stream_hwm_disconnect.cpp +++ b/tests/test_stream_hwm_disconnect.cpp @@ -23,16 +23,16 @@ void test_stream_hwm_disconnect () // Connect a raw TCP socket to the ZMQ_STREAM socket fd_t fd = connect_socket (endpoint); - // STREAM socket receives two frames on connection: + // STREAM socket receives two frames on connection: // 1. The routing ID of the new peer zmq_msg_t routing_id; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); - + // Store routing ID for later use in disconnection size_t id_size = zmq_msg_size (&routing_id); void *id_data = zmq_msg_data (&routing_id); - + // 2. An empty frame (connection notification) TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); zmq_msg_t empty; @@ -45,7 +45,8 @@ void test_stream_hwm_disconnect () // In ZMQ_STREAM, we send [Routing ID][Data]. while (true) { // Send Routing ID frame - int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); + int rc = + zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); if (rc == -1) break; @@ -61,19 +62,26 @@ void test_stream_hwm_disconnect () // Verify that we actually reached the HWM TEST_ASSERT_EQUAL_INT (EAGAIN, errno); - // TEST: Attempt to disconnect the client by sending the Routing ID - // followed by a 0-byte payload. - // Before the fix, the first frame (Routing ID) would fail with EAGAIN. + // TEST: Attempt to disconnect the client. + // If the loop above ended after the ID frame but before the data frame, + // the socket is in a 'more' state. We handle both scenarios. int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); - TEST_ASSERT_EQUAL_INT ((int) id_size, rc); - // The second frame (0-byte) should trigger the termination logic - rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); - TEST_ASSERT_EQUAL_INT (0, rc); + if (rc == -1 && errno == EAGAIN) { + // Socket is mid-message (waiting for payload). + // Send 0-byte frame to disconnect. + rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (0, rc); + } else { + // Socket was at message boundary. Send ID then 0-byte frame. + TEST_ASSERT_EQUAL_INT ((int) id_size, rc); + rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (0, rc); + } // Cleanup resources TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&routing_id)); - close (fd); // Standard POSIX close as seen in other test files + close (fd); test_context_socket_close (stream); } From c5f0e8cd2c0416f0eb969c65d685a6912d4b059f Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Sun, 18 Jan 2026 20:57:35 -0800 Subject: [PATCH 5/6] Problem: ZMQ_STREAM sockets can get stuck trying to disconnect when hwm is at limit Solution: Implement zmq_disconnect_peer support for ZMQ_STREAM --- Makefile.am | 10 +- doc/zmq_disconnect_peer.adoc | 2 +- src/stream.cpp | 36 ++++++-- src/stream.hpp | 1 + tests/CMakeLists.txt | 2 +- tests/test_stream_disconnect_peer.cpp | 128 ++++++++++++++++++++++++++ tests/test_stream_hwm_disconnect.cpp | 95 ------------------- 7 files changed, 166 insertions(+), 108 deletions(-) create mode 100644 tests/test_stream_disconnect_peer.cpp delete mode 100644 tests/test_stream_hwm_disconnect.cpp diff --git a/Makefile.am b/Makefile.am index f2effe1f57..3e2bf52c2a 100755 --- a/Makefile.am +++ b/Makefile.am @@ -460,8 +460,8 @@ test_apps = \ tests/test_probe_router \ tests/test_stream \ tests/test_stream_empty \ + tests/test_stream_disconnect_peer \ tests/test_stream_disconnect \ - tests/test_stream_hwm_disconnect \ tests/test_stream_timeout \ tests/test_disconnect_inproc \ tests/test_unbind_wildcard \ @@ -636,14 +636,14 @@ tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp tests_test_stream_timeout_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_stream_timeout_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_stream_disconnect_peer_SOURCES = tests/test_stream_disconnect_peer.cpp +tests_test_stream_disconnect_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_stream_disconnect_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS} -tests_test_stream_hwm_disconnect_SOURCES = tests/test_stream_hwm_disconnect.cpp -tests_test_stream_hwm_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la -tests_test_stream_hwm_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS} - tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp tests_test_disconnect_inproc_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_disconnect_inproc_CPPFLAGS = ${TESTUTIL_CPPFLAGS} diff --git a/doc/zmq_disconnect_peer.adoc b/doc/zmq_disconnect_peer.adoc index f76429adcb..6f857cba7e 100644 --- a/doc/zmq_disconnect_peer.adoc +++ b/doc/zmq_disconnect_peer.adoc @@ -16,7 +16,7 @@ to send messages addressed with that 'routing_id' will fail with 'EHOSTUNREACH' until a new connection with a different 'routing_id' is established. This function is supported on socket types that manage per-peer routing ids: -'ZMQ_SERVER' and 'ZMQ_PEER'. Calling it on other socket types will fail with +'ZMQ_SERVER', 'ZMQ_PEER' and `ZMQ_STREAM`. Calling it on other socket types will fail with 'ENOTSUP'. diff --git a/src/stream.cpp b/src/stream.cpp index 0d852e2516..11983f9b3e 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -79,6 +79,9 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = out_pipe->pipe; if (!_current_out->check_write ()) { out_pipe->active = false; + _current_out = NULL; + errno = EAGAIN; + return -1; } } else { errno = EHOSTUNREACH; @@ -116,11 +119,6 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = NULL; return 0; } - if (!_current_out->check_write ()) { - _more_out = true; - errno = EAGAIN; - return -1; - } const bool ok = _current_out->write (msg_); if (likely (ok)) _current_out->flush (); @@ -241,6 +239,32 @@ bool zmq::stream_t::xhas_out () return true; } +int zmq::stream_t::xdisconnect_peer (uint32_t routing_id_) +{ + unsigned char buffer[5]; + buffer[0] = 0; + put_uint32 (buffer + 1, routing_id_); + + blob_t routing_id; + routing_id.set (buffer, sizeof buffer); + + out_pipe_t *out_pipe = lookup_out_pipe (routing_id); + if (!out_pipe) { + errno = EHOSTUNREACH; + return -1; + } + + out_pipe->pipe->terminate (false); + + // if currently writing to this pipe at same time, reset _current_out and _more_out + if (out_pipe->pipe == _current_out) { + _current_out = NULL; + _more_out = false; + } + + return 0; +} + void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) { // Always assign routing id for raw-socket @@ -263,4 +287,4 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) } pipe_->set_router_socket_routing_id (routing_id); add_out_pipe (ZMQ_MOVE (routing_id), pipe_); -} +} \ No newline at end of file diff --git a/src/stream.hpp b/src/stream.hpp index f8eb9901e3..1485e138ba 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -29,6 +29,7 @@ class stream_t ZMQ_FINAL : public routing_socket_base_t void xread_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xdisconnect_peer (uint32_t routing_id_) ZMQ_OVERRIDE; private: // Generate peer's id and update lookup map diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 871c85a1f6..9e4030c8d0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -26,8 +26,8 @@ set(tests test_probe_router test_stream test_stream_empty + test_stream_disconnect_peer test_stream_disconnect - test_stream_hwm_disconnect test_disconnect_inproc test_unbind_wildcard test_ctx_options diff --git a/tests/test_stream_disconnect_peer.cpp b/tests/test_stream_disconnect_peer.cpp new file mode 100644 index 0000000000..895d2987e7 --- /dev/null +++ b/tests/test_stream_disconnect_peer.cpp @@ -0,0 +1,128 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#define ZMQ_BUILD_DRAFT_API + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +#if defined ZMQ_HAVE_WINDOWS +#include +#else +#include +#endif + +SETUP_TEARDOWN_TESTCONTEXT + +// Helper to extract numeric host ID from the 5-byte ZMQ_STREAM frame [0x00][uint32] +static uint32_t extract_id (zmq_msg_t *msg_) +{ + TEST_ASSERT_EQUAL_INT (5, zmq_msg_size (msg_)); + const unsigned char *id_ptr = (const unsigned char *) zmq_msg_data (msg_); + uint32_t net_id; + memcpy (&net_id, id_ptr + 1, 4); + return ntohl (net_id); +} + +static void test_stream_disconnect_peer () +{ + char my_endpoint[MAX_SOCKET_STRING]; + + // We'll be using this socket to test the surgical disconnect API + void *stream = test_context_socket (ZMQ_STREAM); + + // Set timeouts to prevent the test from hanging indefinitely on failure + int timeout = 500; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (stream, ZMQ_SNDTIMEO, &timeout, sizeof (timeout))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (stream, ZMQ_RCVTIMEO, &timeout, sizeof (timeout))); + + bind_loopback_ipv4 (stream, my_endpoint, sizeof (my_endpoint)); + + // Connect two distinct clients to test isolation and state reset + fd_t fd_a = connect_socket (my_endpoint); + fd_t fd_b = connect_socket (my_endpoint); + + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + + // Peer A Setup: Receive connection notification + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); + uint32_t id_a_numeric = extract_id (&msg); + unsigned char id_a_raw[5]; + memcpy (id_a_raw, zmq_msg_data (&msg), 5); + TEST_ASSERT_EQUAL_INT ( + 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0))); + + // Peer B Setup: Receive connection notification + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); + uint32_t id_b_numeric = extract_id (&msg); + unsigned char id_b_raw[5]; + memcpy (id_b_raw, zmq_msg_data (&msg), 5); + TEST_ASSERT_EQUAL_INT ( + 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0))); + + // Verify Peer IDs are unique + TEST_ASSERT_NOT_EQUAL (id_a_numeric, id_b_numeric); + + // --- CASE 1: THE DIRTY RESET --- + // Start a multi-part message to Peer A. + // This locks the socket state machine (_more_out = true, _current_out = Pipe A). + TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE)); + + // Use the new API to surgically disconnect Peer A. + // This must force-reset the internal 'more' state and NULL the current pipe. + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect_peer (stream, id_a_numeric)); + msleep (SETTLE_TIME); + + // Attempt to talk to Peer B immediately. + // If the reset failed, this would misroute the ID frame as data for Peer A. + TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_b_raw, 5, ZMQ_SNDMORE)); + TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, "HELLO", 5, 0)); + + // Verify Peer B actually received the data via raw TCP + char recv_buf[5]; + int bytes = recv (fd_b, recv_buf, 5, 0); + TEST_ASSERT_EQUAL_INT (5, bytes); + TEST_ASSERT_EQUAL_STRING_LEN ("HELLO", recv_buf, 5); + + // --- CASE 2: SURGICAL ISOLATION --- + // Verify Peer A is gone from the routing table; sending to it should fail. + int rc = zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno); + + // --- CASE 3: INBOUND INTEGRITY --- + // Ensure Peer B can still send data to the server (FQ remains intact). + const char *ping = "PING"; + send (fd_b, ping, 4, 0); + msleep (SETTLE_TIME); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); + TEST_ASSERT_EQUAL_INT (id_b_numeric, extract_id (&msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); + TEST_ASSERT_EQUAL_STRING_LEN (ping, (char *) zmq_msg_data (&msg), 4); + + // --- CASE 4: ERROR HANDLING --- + // Attempt to disconnect a non-existent ID + rc = zmq_disconnect_peer (stream, 0x12345678); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno); + + // Cleanup + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + close (fd_a); + close (fd_b); + test_context_socket_close (stream); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_stream_disconnect_peer); + return UNITY_END (); +} \ No newline at end of file diff --git a/tests/test_stream_hwm_disconnect.cpp b/tests/test_stream_hwm_disconnect.cpp deleted file mode 100644 index 23e893e86d..0000000000 --- a/tests/test_stream_hwm_disconnect.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* SPDX-License-Identifier: MPL-2.0 */ - -#include "testutil.hpp" -#include "testutil_unity.hpp" - -#include - -SETUP_TEARDOWN_TESTCONTEXT - -void test_stream_hwm_disconnect () -{ - void *stream = test_context_socket (ZMQ_STREAM); - char endpoint[MAX_SOCKET_STRING]; - - // Set a low Send High Water Mark to trigger the issue quickly - int sndhwm = 3; - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_setsockopt (stream, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm))); - - // Bind the STREAM socket to a loopback address - bind_loopback_ipv4 (stream, endpoint, sizeof (endpoint)); - - // Connect a raw TCP socket to the ZMQ_STREAM socket - fd_t fd = connect_socket (endpoint); - - // STREAM socket receives two frames on connection: - // 1. The routing ID of the new peer - zmq_msg_t routing_id; - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); - - // Store routing ID for later use in disconnection - size_t id_size = zmq_msg_size (&routing_id); - void *id_data = zmq_msg_data (&routing_id); - - // 2. An empty frame (connection notification) - TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); - zmq_msg_t empty; - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&empty)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&empty, stream, 0)); - TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&empty)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&empty)); - - // Fill the outgoing pipe until it hits the High Water Mark. - // In ZMQ_STREAM, we send [Routing ID][Data]. - while (true) { - // Send Routing ID frame - int rc = - zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); - if (rc == -1) - break; - - // Send a large data frame to fill the buffer - zmq_msg_t data; - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&data, 262144)); - rc = zmq_msg_send (&data, stream, ZMQ_DONTWAIT); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data)); - if (rc == -1) - break; - } - - // Verify that we actually reached the HWM - TEST_ASSERT_EQUAL_INT (EAGAIN, errno); - - // TEST: Attempt to disconnect the client. - // If the loop above ended after the ID frame but before the data frame, - // the socket is in a 'more' state. We handle both scenarios. - int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE); - - if (rc == -1 && errno == EAGAIN) { - // Socket is mid-message (waiting for payload). - // Send 0-byte frame to disconnect. - rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); - TEST_ASSERT_EQUAL_INT (0, rc); - } else { - // Socket was at message boundary. Send ID then 0-byte frame. - TEST_ASSERT_EQUAL_INT ((int) id_size, rc); - rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT); - TEST_ASSERT_EQUAL_INT (0, rc); - } - - // Cleanup resources - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&routing_id)); - close (fd); - test_context_socket_close (stream); -} - -int main (int, char **) -{ - setup_test_environment (); - - UNITY_BEGIN (); - RUN_TEST (test_stream_hwm_disconnect); - return UNITY_END (); -} \ No newline at end of file From 86005686fd317f52752356ce4ea2c3a10e8d74a7 Mon Sep 17 00:00:00 2001 From: Guy Cohen <110805121+guyco3@users.noreply.github.com> Date: Mon, 19 Jan 2026 03:32:09 -0800 Subject: [PATCH 6/6] Problem: When a ZMQ_STREAM socket reaches its SNDHWM, it becomes impossible to disconnect a peer. Disconnecting requires sending the routing ID followed by a 0-byte frame. Currently, xsend returns EAGAIN on the first frame (the ID) if the pipe is full, preventing the second disconnect frame from ever being processed. Solution: Modified xsend in stream.cpp to allow the routing ID frame to pass even when the pipe is full. If the subsequent payload frame is 0-bytes, the connection is terminated immediately via terminate(). To prevent state-machine desync, _more_out is reset to false if a data-bearing payload frame is sent on a full pipe, forcing an EAGAIN and requiring a clean retry from the user. --- Makefile.am | 5 - doc/zmq_disconnect_peer.adoc | 2 +- src/stream.cpp | 36 ++------ src/stream.hpp | 1 - tests/CMakeLists.txt | 1 - tests/test_stream_disconnect_peer.cpp | 128 -------------------------- 6 files changed, 8 insertions(+), 165 deletions(-) delete mode 100644 tests/test_stream_disconnect_peer.cpp diff --git a/Makefile.am b/Makefile.am index 3e2bf52c2a..17dc1e6e16 100755 --- a/Makefile.am +++ b/Makefile.am @@ -460,7 +460,6 @@ test_apps = \ tests/test_probe_router \ tests/test_stream \ tests/test_stream_empty \ - tests/test_stream_disconnect_peer \ tests/test_stream_disconnect \ tests/test_stream_timeout \ tests/test_disconnect_inproc \ @@ -636,10 +635,6 @@ tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp tests_test_stream_timeout_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_stream_timeout_CPPFLAGS = ${TESTUTIL_CPPFLAGS} -tests_test_stream_disconnect_peer_SOURCES = tests/test_stream_disconnect_peer.cpp -tests_test_stream_disconnect_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la -tests_test_stream_disconnect_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS} - tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS} diff --git a/doc/zmq_disconnect_peer.adoc b/doc/zmq_disconnect_peer.adoc index 6f857cba7e..f76429adcb 100644 --- a/doc/zmq_disconnect_peer.adoc +++ b/doc/zmq_disconnect_peer.adoc @@ -16,7 +16,7 @@ to send messages addressed with that 'routing_id' will fail with 'EHOSTUNREACH' until a new connection with a different 'routing_id' is established. This function is supported on socket types that manage per-peer routing ids: -'ZMQ_SERVER', 'ZMQ_PEER' and `ZMQ_STREAM`. Calling it on other socket types will fail with +'ZMQ_SERVER' and 'ZMQ_PEER'. Calling it on other socket types will fail with 'ENOTSUP'. diff --git a/src/stream.cpp b/src/stream.cpp index 11983f9b3e..4c85625b6b 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -79,9 +79,6 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = out_pipe->pipe; if (!_current_out->check_write ()) { out_pipe->active = false; - _current_out = NULL; - errno = EAGAIN; - return -1; } } else { errno = EHOSTUNREACH; @@ -119,6 +116,13 @@ int zmq::stream_t::xsend (msg_t *msg_) _current_out = NULL; return 0; } + if (!_current_out->check_write ()) { + // Because we set _more_out to false above, the user is forced + // to resend the Identity frame on their next attempt. + _current_out = NULL; + errno = EAGAIN; + return -1; + } const bool ok = _current_out->write (msg_); if (likely (ok)) _current_out->flush (); @@ -239,32 +243,6 @@ bool zmq::stream_t::xhas_out () return true; } -int zmq::stream_t::xdisconnect_peer (uint32_t routing_id_) -{ - unsigned char buffer[5]; - buffer[0] = 0; - put_uint32 (buffer + 1, routing_id_); - - blob_t routing_id; - routing_id.set (buffer, sizeof buffer); - - out_pipe_t *out_pipe = lookup_out_pipe (routing_id); - if (!out_pipe) { - errno = EHOSTUNREACH; - return -1; - } - - out_pipe->pipe->terminate (false); - - // if currently writing to this pipe at same time, reset _current_out and _more_out - if (out_pipe->pipe == _current_out) { - _current_out = NULL; - _more_out = false; - } - - return 0; -} - void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) { // Always assign routing id for raw-socket diff --git a/src/stream.hpp b/src/stream.hpp index 1485e138ba..f8eb9901e3 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -29,7 +29,6 @@ class stream_t ZMQ_FINAL : public routing_socket_base_t void xread_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xdisconnect_peer (uint32_t routing_id_) ZMQ_OVERRIDE; private: // Generate peer's id and update lookup map diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9e4030c8d0..12ac80c175 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -26,7 +26,6 @@ set(tests test_probe_router test_stream test_stream_empty - test_stream_disconnect_peer test_stream_disconnect test_disconnect_inproc test_unbind_wildcard diff --git a/tests/test_stream_disconnect_peer.cpp b/tests/test_stream_disconnect_peer.cpp deleted file mode 100644 index 895d2987e7..0000000000 --- a/tests/test_stream_disconnect_peer.cpp +++ /dev/null @@ -1,128 +0,0 @@ -/* SPDX-License-Identifier: MPL-2.0 */ - -#define ZMQ_BUILD_DRAFT_API - -#include "testutil.hpp" -#include "testutil_unity.hpp" - -#include - -#if defined ZMQ_HAVE_WINDOWS -#include -#else -#include -#endif - -SETUP_TEARDOWN_TESTCONTEXT - -// Helper to extract numeric host ID from the 5-byte ZMQ_STREAM frame [0x00][uint32] -static uint32_t extract_id (zmq_msg_t *msg_) -{ - TEST_ASSERT_EQUAL_INT (5, zmq_msg_size (msg_)); - const unsigned char *id_ptr = (const unsigned char *) zmq_msg_data (msg_); - uint32_t net_id; - memcpy (&net_id, id_ptr + 1, 4); - return ntohl (net_id); -} - -static void test_stream_disconnect_peer () -{ - char my_endpoint[MAX_SOCKET_STRING]; - - // We'll be using this socket to test the surgical disconnect API - void *stream = test_context_socket (ZMQ_STREAM); - - // Set timeouts to prevent the test from hanging indefinitely on failure - int timeout = 500; - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_setsockopt (stream, ZMQ_SNDTIMEO, &timeout, sizeof (timeout))); - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_setsockopt (stream, ZMQ_RCVTIMEO, &timeout, sizeof (timeout))); - - bind_loopback_ipv4 (stream, my_endpoint, sizeof (my_endpoint)); - - // Connect two distinct clients to test isolation and state reset - fd_t fd_a = connect_socket (my_endpoint); - fd_t fd_b = connect_socket (my_endpoint); - - zmq_msg_t msg; - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); - - // Peer A Setup: Receive connection notification - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); - uint32_t id_a_numeric = extract_id (&msg); - unsigned char id_a_raw[5]; - memcpy (id_a_raw, zmq_msg_data (&msg), 5); - TEST_ASSERT_EQUAL_INT ( - 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0))); - - // Peer B Setup: Receive connection notification - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); - uint32_t id_b_numeric = extract_id (&msg); - unsigned char id_b_raw[5]; - memcpy (id_b_raw, zmq_msg_data (&msg), 5); - TEST_ASSERT_EQUAL_INT ( - 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0))); - - // Verify Peer IDs are unique - TEST_ASSERT_NOT_EQUAL (id_a_numeric, id_b_numeric); - - // --- CASE 1: THE DIRTY RESET --- - // Start a multi-part message to Peer A. - // This locks the socket state machine (_more_out = true, _current_out = Pipe A). - TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE)); - - // Use the new API to surgically disconnect Peer A. - // This must force-reset the internal 'more' state and NULL the current pipe. - TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect_peer (stream, id_a_numeric)); - msleep (SETTLE_TIME); - - // Attempt to talk to Peer B immediately. - // If the reset failed, this would misroute the ID frame as data for Peer A. - TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_b_raw, 5, ZMQ_SNDMORE)); - TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, "HELLO", 5, 0)); - - // Verify Peer B actually received the data via raw TCP - char recv_buf[5]; - int bytes = recv (fd_b, recv_buf, 5, 0); - TEST_ASSERT_EQUAL_INT (5, bytes); - TEST_ASSERT_EQUAL_STRING_LEN ("HELLO", recv_buf, 5); - - // --- CASE 2: SURGICAL ISOLATION --- - // Verify Peer A is gone from the routing table; sending to it should fail. - int rc = zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE); - TEST_ASSERT_EQUAL_INT (-1, rc); - TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno); - - // --- CASE 3: INBOUND INTEGRITY --- - // Ensure Peer B can still send data to the server (FQ remains intact). - const char *ping = "PING"; - send (fd_b, ping, 4, 0); - msleep (SETTLE_TIME); - - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); - TEST_ASSERT_EQUAL_INT (id_b_numeric, extract_id (&msg)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)); - TEST_ASSERT_EQUAL_STRING_LEN (ping, (char *) zmq_msg_data (&msg), 4); - - // --- CASE 4: ERROR HANDLING --- - // Attempt to disconnect a non-existent ID - rc = zmq_disconnect_peer (stream, 0x12345678); - TEST_ASSERT_EQUAL_INT (-1, rc); - TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno); - - // Cleanup - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); - close (fd_a); - close (fd_b); - test_context_socket_close (stream); -} - -int main (void) -{ - setup_test_environment (); - - UNITY_BEGIN (); - RUN_TEST (test_stream_disconnect_peer); - return UNITY_END (); -} \ No newline at end of file