From cf8f5c69a483e43f4fee5b36eb805f188d8282aa Mon Sep 17 00:00:00 2001 From: Andrey Vostrikov Date: Tue, 11 Oct 2022 11:45:09 +0300 Subject: [PATCH 1/5] Fill NNG_PKGS to request nng dependencies from installed location. Signed-off-by: Andrey Vostrikov --- cmake/FindmbedTLS.cmake | 2 +- cmake/NNGHelpers.cmake | 10 +++++++++- src/platform/posix/CMakeLists.txt | 4 ++-- src/sp/transport/zerotier/CMakeLists.txt | 2 +- src/supplemental/tls/mbedtls/CMakeLists.txt | 2 +- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cmake/FindmbedTLS.cmake b/cmake/FindmbedTLS.cmake index f01039d6c..804a03927 100644 --- a/cmake/FindmbedTLS.cmake +++ b/cmake/FindmbedTLS.cmake @@ -82,5 +82,5 @@ endif () string(REGEX REPLACE ".*MBEDTLS_VERSION_STRING[\t ]+\"(.*)\"" "\\1" MBEDTLS_VERSION ${_MBEDTLS_VERLINE}) find_package_handle_standard_args(mbedTLS - REQUIRED_VARS MBEDTLS_TLS_LIBRARY MBEDTLS_CRYPTO_LIBRARY MBEDTLS_X509_LIBRARY MBEDTLS_INCLUDE_DIR VERSION_VAR MBEDTLS_VERSION) + REQUIRED_VARS MBEDTLS_TLS_LIBRARY MBEDTLS_CRYPTO_LIBRARY MBEDTLS_X509_LIBRARY MBEDTLS_INCLUDE_DIR MBEDTLS_LIBRARIES VERSION_VAR MBEDTLS_VERSION) diff --git a/cmake/NNGHelpers.cmake b/cmake/NNGHelpers.cmake index d7596827b..d97d800c3 100644 --- a/cmake/NNGHelpers.cmake +++ b/cmake/NNGHelpers.cmake @@ -41,6 +41,14 @@ function(nng_defines) target_compile_definitions(nng_private INTERFACE ${ARGN}) endfunction() +# nng_find_package looks up required package and adds dependency to the cmake config. +macro(nng_find_package PACKAGE_NAME) + find_package(${PACKAGE_NAME} REQUIRED) + list(APPEND NNG_PKGS ${PACKAGE_NAME}) + list(REMOVE_DUPLICATES NNG_PKGS) + set(NNG_PKGS ${NNG_PKGS} CACHE INTERNAL "nng package dependencies" FORCE) +endmacro() + # nng_link_libraries adds link dependencies to the libraries. function(nng_link_libraries) target_link_libraries(nng PRIVATE ${ARGN}) @@ -156,4 +164,4 @@ endfunction(nng_check_struct_member) macro(nng_directory DIR) set(NNG_TEST_PREFIX ${NNG_TEST_PREFIX}.${DIR}) -endmacro(nng_directory) \ No newline at end of file +endmacro(nng_directory) diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index 7b619fa22..dcfea2211 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -13,7 +13,7 @@ # the static library unless they also go into the dynamic. if (NNG_PLATFORM_POSIX) - find_package(Threads REQUIRED) + nng_find_package(Threads) nng_link_libraries(Threads::Threads) # Unconditionally declare the following feature test macros. These are @@ -108,4 +108,4 @@ if (NNG_PLATFORM_POSIX) nng_test(posix_ipcwinsec_test) -endif () \ No newline at end of file +endif () diff --git a/src/sp/transport/zerotier/CMakeLists.txt b/src/sp/transport/zerotier/CMakeLists.txt index 903b7f56f..dedc92965 100644 --- a/src/sp/transport/zerotier/CMakeLists.txt +++ b/src/sp/transport/zerotier/CMakeLists.txt @@ -28,7 +28,7 @@ if (NNG_TRANSPORT_ZEROTIER) Consult a lawyer and the license files for details. ************************************************************") - find_package(zerotiercore REQUIRED) + nng_find_package(zerotiercore) nng_link_libraries(zerotiercore::zerotiercore) nng_defines(NNG_TRANSPORT_ZEROTIER) diff --git a/src/supplemental/tls/mbedtls/CMakeLists.txt b/src/supplemental/tls/mbedtls/CMakeLists.txt index a0af30c3d..2d639efa6 100644 --- a/src/supplemental/tls/mbedtls/CMakeLists.txt +++ b/src/supplemental/tls/mbedtls/CMakeLists.txt @@ -23,7 +23,7 @@ if (NNG_TLS_ENGINE STREQUAL "mbed") if (TARGET mbedtls) nng_link_libraries(mbedtls) else() - find_package(mbedTLS REQUIRED) + nng_find_package(mbedTLS) nng_link_libraries(${MBEDTLS_LIBRARIES}) nng_include_directories(${MBEDTLS_INCLUDE_DIR}) endif() From e2230be0f87be112d51a02dc8d6435ee59a82d55 Mon Sep 17 00:00:00 2001 From: Dmitry Shifrin Date: Wed, 30 Nov 2022 11:48:05 +0300 Subject: [PATCH 2/5] sp: hub: Introduce hub proto Add hub protocol. Protocol with star topology and send/receive guarantees. Protocol is compatible with pair0 --- cmake/NNGOptions.cmake | 4 + docs/man/CMakeLists.txt | 2 + docs/man/libnng.3.adoc | 2 + docs/man/nng.7.adoc | 2 + docs/man/nng_hub.7.adoc | 58 +++ docs/man/nng_hub_open.3.adoc | 45 ++ include/nng/protocol/hub0/hub.h | 31 ++ src/sp/protocol/CMakeLists.txt | 2 + src/sp/protocol/hub0/CMakeLists.txt | 16 + src/sp/protocol/hub0/hub.c | 608 ++++++++++++++++++++++++++++ src/sp/protocol/hub0/hub_test.c | 457 +++++++++++++++++++++ 11 files changed, 1227 insertions(+) create mode 100644 docs/man/nng_hub.7.adoc create mode 100644 docs/man/nng_hub_open.3.adoc create mode 100644 include/nng/protocol/hub0/hub.h create mode 100644 src/sp/protocol/hub0/CMakeLists.txt create mode 100644 src/sp/protocol/hub0/hub.c create mode 100644 src/sp/protocol/hub0/hub_test.c diff --git a/cmake/NNGOptions.cmake b/cmake/NNGOptions.cmake index b8067bca0..14387e2c8 100644 --- a/cmake/NNGOptions.cmake +++ b/cmake/NNGOptions.cmake @@ -1,5 +1,6 @@ # # Copyright 2020 Staysail Systems, Inc. +# Copyright 2022 Cogent Embedded, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -46,6 +47,9 @@ mark_as_advanced(NNG_ENABLE_STATS) option (NNG_PROTO_BUS0 "Enable BUSv0 protocol." ON) mark_as_advanced(NNG_PROTO_BUS0) +option (NNG_PROTO_HUB0 "Enable HUBv0 protocol." ON) +mark_as_advanced(NNG_PROTO_HUB0) + option (NNG_PROTO_PAIR0 "Enable PAIRv0 protocol." ON) mark_as_advanced(NNG_PROTO_PAIR0) diff --git a/docs/man/CMakeLists.txt b/docs/man/CMakeLists.txt index a33d4ee8d..0336f3936 100644 --- a/docs/man/CMakeLists.txt +++ b/docs/man/CMakeLists.txt @@ -82,6 +82,7 @@ if (NNG_ENABLE_DOC) nng_aio_wait nng_alloc nng_bus_open + nng_hub_open nng_close nng_ctx_close nng_ctx_get @@ -379,6 +380,7 @@ if (NNG_ENABLE_DOC) set(NNG_MAN7 nng nng_bus + nng_hub nng_inproc nng_ipc nng_pair diff --git a/docs/man/libnng.3.adoc b/docs/man/libnng.3.adoc index eaeae24db..22bd4bebe 100644 --- a/docs/man/libnng.3.adoc +++ b/docs/man/libnng.3.adoc @@ -4,6 +4,7 @@ // Copyright 2018 Capitar IT Group BV // Copyright 2019 Devolutions // Copyright 2020 Dirac Research +// Copyright 2022 Cogent Embedded, Inc. // // This document is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -193,6 +194,7 @@ The following functions are used to construct a socket with a specific protocol: |=== |xref:nng_bus_open.3.adoc[nng_bus_open()]|open a bus socket +|xref:nng_hub_open.3.adoc[nng_hub_open()]|open a hub socket |xref:nng_pair_open.3.adoc[nng_pair_open()]|open a pair socket |xref:nng_pub_open.3.adoc[nng_pub_open()]|open a pub socket |xref:nng_pull_open.3.adoc[nng_pull_open()]|open a pull socket diff --git a/docs/man/nng.7.adoc b/docs/man/nng.7.adoc index 1097a5f8a..5e3847538 100644 --- a/docs/man/nng.7.adoc +++ b/docs/man/nng.7.adoc @@ -1,5 +1,6 @@ = nng(7) // +// Copyright 2022 Cogent Embedded, Inc. // Copyright 2019 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // @@ -54,6 +55,7 @@ other languages please check the http://nanomsg.org/[website]. [horizontal] xref:nng_bus.7.adoc[nng_bus(7)]:: Bus protocol +xref:nng_hub.7.adoc[nng_hub(7)]:: Hub protocol xref:nng_pair.7.adoc[nng_pair(7)]:: Pair protocol xref:nng_pub.7.adoc[nng_pub(7)]:: Publisher side of publish/subscribe protocol xref:nng_pull.7.adoc[nng_pull(7)]:: Pull side of pipeline protocol diff --git a/docs/man/nng_hub.7.adoc b/docs/man/nng_hub.7.adoc new file mode 100644 index 000000000..45ecae52c --- /dev/null +++ b/docs/man/nng_hub.7.adoc @@ -0,0 +1,58 @@ += nng_hub(7) +// +// Copyright 2022 Cogent Embedded, Inc. +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_hub - hub protocol + +== SYNOPSIS + +[source,c] +---- +#include +---- + +== DESCRIPTION + +(((protocol, _hub_))) +The ((_hub_ protocol)) provides for building star networks where +all peers are connected to one peer. +In this protocol, each message sent by a node is sent to every one of +its directly connected peers. + +All message delivery in this pattern is guaranteed. + +This protocol is compatible with pair0 + +=== Socket Operations + +The xref:nng_hub_open.3.adoc[`nng_hub0_open()`] functions create a hub socket. +This socket may be used to send and receive messages. +Sending messages will attempt to deliver to each connected peer. + +=== Protocol Versions + +Only version 0 of this protocol is supported. +(At the time of writing, no other versions of this protocol have been defined.) + +=== Protocol Options + +The _hub_ protocol has no protocol-specific options. + +=== Protocol Headers + +No message headers are present. + +== SEE ALSO + +[.text-left] +xref:nng_hub_open.3.adoc[nng_hub_open(3)], +xref:nng_pair.7.adoc[nng_pair], +xref:nng.7.adoc[nng(7)] diff --git a/docs/man/nng_hub_open.3.adoc b/docs/man/nng_hub_open.3.adoc new file mode 100644 index 000000000..d6f6521e9 --- /dev/null +++ b/docs/man/nng_hub_open.3.adoc @@ -0,0 +1,45 @@ += nng_hub_open(3) +// +// Copyright 2022 Cogent Embedded, Inc. +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_hub_open - create hub socket + +== SYNOPSIS + +[source,c] +---- +#include +#include + +int nng_hub0_open(nng_socket *s); +---- + +== DESCRIPTION + +The `nng_hub0_open()` function creates a xref:nng_hub.7.adoc[_hub_] version 0 +xref:nng_socket.5.adoc[socket] and returns it at the location pointed to by _s_. + +== RETURN VALUES + +These functions return 0 on success, and non-zero otherwise. + +== ERRORS + +[horizontal] +`NNG_ENOMEM`:: Insufficient memory is available. +`NNG_ENOTSUP`:: The protocol is not supported. + +== SEE ALSO + +[.text-left] +xref:nng_socket.5.adoc[nng_socket(5)], +xref:nng_hub.7.adoc[nng_hub(7)], +xref:nng.7.adoc[nng(7)] diff --git a/include/nng/protocol/hub0/hub.h b/include/nng/protocol/hub0/hub.h new file mode 100644 index 000000000..49d04b4a5 --- /dev/null +++ b/include/nng/protocol/hub0/hub.h @@ -0,0 +1,31 @@ +// +// Copyright 2022 Cogent Embedded, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// +#ifndef NNG_PROTOCOL_HUB0_HUB_H +#define NNG_PROTOCOL_HUB0_HUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define NNG_HUB0_SELF 0x10 +#define NNG_HUB0_PEER 0x10 +#define NNG_HUB0_SELF_NAME "hub" +#define NNG_HUB0_PEER_NAME "hub" + +NNG_DECL int nng_hub0_open(nng_socket *); + +#ifndef nng_hub_open +#define nng_hub_open nng_hub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* NNG_PROTOCOL_HUB0_HUB_H_ */ diff --git a/src/sp/protocol/CMakeLists.txt b/src/sp/protocol/CMakeLists.txt index fd4805237..a20ead29f 100644 --- a/src/sp/protocol/CMakeLists.txt +++ b/src/sp/protocol/CMakeLists.txt @@ -1,5 +1,6 @@ # # Copyright 2020 Staysail Systems, Inc. +# Copyright 2022 Cogent Embedded, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -11,6 +12,7 @@ nng_directory(protocol) add_subdirectory(bus0) +add_subdirectory(hub0) add_subdirectory(pair0) add_subdirectory(pair1) add_subdirectory(pipeline0) diff --git a/src/sp/protocol/hub0/CMakeLists.txt b/src/sp/protocol/hub0/CMakeLists.txt new file mode 100644 index 000000000..3ae6c0d93 --- /dev/null +++ b/src/sp/protocol/hub0/CMakeLists.txt @@ -0,0 +1,16 @@ +# +# Copyright 2022 Cogent Embedded, Inc. +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +nng_directory(hub0) + +nng_sources_if(NNG_PROTO_HUB0 hub.c) +nng_headers_if(NNG_PROTO_HUB0 nng/protocol/hub0/hub.h) +nng_defines_if(NNG_PROTO_HUB0 NNG_HAVE_HUB0) + +nng_test(hub_test) \ No newline at end of file diff --git a/src/sp/protocol/hub0/hub.c b/src/sp/protocol/hub0/hub.c new file mode 100644 index 000000000..457310bf0 --- /dev/null +++ b/src/sp/protocol/hub0/hub.c @@ -0,0 +1,608 @@ +// +// Copyright 2022 Cogent Embedded, Inc. +// Copyright 2021 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// +#include +#include + +#include "core/nng_impl.h" +#include "nng/protocol/hub0/hub.h" +#include + +#ifndef NNI_PROTO_HUB_V0 +#define NNI_PROTO_HUB_V0 NNI_PROTO(1, 0) +#endif + +typedef struct hub0_pipe hub0_pipe; +typedef struct hub0_sock hub0_sock; + +static void hub0_sock_send(void *, nni_aio *); +static void hub0_sock_recv(void *, nni_aio *); + +static void hub0_pipe_recv(hub0_pipe *); + +static void hub0_pipe_send_cb(void *); +static void hub0_pipe_recv_cb(void *); + +// hub0_sock is our per-socket protocol private structure. +struct hub0_sock { + nni_list pipes; + nni_mtx mtx; + nni_pollable can_send; + nni_pollable can_recv; + nni_lmq recv_msgs; + nni_list recv_wait; + int send_buf; + nni_list waq; +}; + +// hub0_pipe is our per-pipe protocol private structure. +struct hub0_pipe { + nni_pipe *pipe; + hub0_sock *hub; + nni_lmq send_queue; + nni_list_node node; + bool busy; + bool read_ready; + nni_aio aio_recv; + nni_aio aio_send; +}; + +static void +hub0_sock_fini(void *arg) +{ + hub0_sock *s = arg; + + nni_mtx_fini(&s->mtx); + nni_pollable_fini(&s->can_send); + nni_pollable_fini(&s->can_recv); + nni_lmq_fini(&s->recv_msgs); +} + +static void +hub0_sock_init(void *arg, nni_sock *ns) +{ + hub0_sock *s = arg; + + NNI_ARG_UNUSED(ns); + + NNI_LIST_INIT(&s->pipes, hub0_pipe, node); + nni_mtx_init(&s->mtx); + nni_aio_list_init(&s->recv_wait); + nni_pollable_init(&s->can_send); + nni_pollable_init(&s->can_recv); + nni_lmq_init(&s->recv_msgs, 16); + s->send_buf = 16; + nni_aio_list_init(&s->waq); +} + +static void +hub0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +hub0_sock_close(void *arg) +{ + hub0_sock *s = arg; + nni_aio *aio; + + nni_mtx_lock(&s->mtx); + + while ((aio = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + while ((aio = nni_list_first(&s->recv_wait)) != NULL) { + nni_list_remove(&s->recv_wait, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); +} + +static void +hub0_send_cancel(nng_aio *aio, void *arg, int rv) +{ + hub0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + +static void +hub0_pipe_send(hub0_pipe *pipe, nni_msg *msg) +{ + if (!pipe->busy) { + pipe->busy = true; + nni_msg_clone(msg); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); + } else if (!nni_lmq_full(&pipe->send_queue)) { + nni_msg_clone(msg); + nni_lmq_put(&pipe->send_queue, msg); + } +} + +static bool +hub0_is_writable(hub0_sock *s) +{ + hub0_pipe *pipe; + + NNI_LIST_FOREACH (&s->pipes, pipe) { + //hub is writable if all pipes are writable + if (nni_lmq_full(&pipe->send_queue)) { + return false; + } + } + return true; +} + +static void +hub0_sched_send(hub0_sock *s) +{ + nni_aio *aio; + hub0_pipe *pipe; + int rv; + nni_msg *msg; + size_t len; + + nni_mtx_lock(&s->mtx); + + if (!hub0_is_writable(s)){ + nni_mtx_unlock(&s->mtx); + return; + } + + while (!nni_list_empty(&s->waq)){ + if ((aio = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(aio); + + if ((rv = nni_aio_schedule(aio, hub0_send_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + continue; + } + + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + + NNI_LIST_FOREACH (&s->pipes, pipe) { + hub0_pipe_send(pipe, msg); + } + + nni_msg_free(msg); + nni_aio_finish(aio, 0, len); + break; + } + } + + if (hub0_is_writable(s)) { + nni_pollable_raise(&s->can_send); + } else { + nni_pollable_clear(&s->can_send); + } + + nni_mtx_unlock(&s->mtx); +} + +static void +hub0_pipe_stop(void *arg) +{ + hub0_pipe *p = arg; + hub0_sock *s = p->hub; + + nni_mtx_lock(&s->mtx); + p->busy = true; + + if (p->read_ready) { + nni_msg *m = nni_aio_get_msg(&p->aio_recv); + nni_msg_free(m); + p->read_ready = false; + } + + nni_mtx_unlock(&s->mtx); + + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); +} + +static void +hub0_pipe_fini(void *arg) +{ + hub0_pipe *p = arg; + + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); + nni_lmq_fini(&p->send_queue); +} + +static int +hub0_pipe_init(void *arg, nni_pipe *np, void *s) +{ + hub0_pipe *p = arg; + + p->pipe = np; + p->hub = s; + NNI_LIST_NODE_INIT(&p->node); + nni_aio_init(&p->aio_send, hub0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, hub0_pipe_recv_cb, p); + nni_lmq_init(&p->send_queue, p->hub->send_buf); + + return (0); +} + +static int +hub0_pipe_start(void *arg) +{ + hub0_pipe *p = arg; + hub0_sock *s = p->hub; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_HUB_V0) { + return (NNG_EPROTO); + } + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + p->busy = false; + p->read_ready = false; + nni_mtx_unlock(&s->mtx); + + hub0_sched_send(s); + + hub0_pipe_recv(p); + + return (0); +} + +static void +hub0_pipe_close(void *arg) +{ + hub0_pipe *p = arg; + hub0_sock *s = p->hub; + + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); + + nni_mtx_lock(&s->mtx); + nni_lmq_flush(&p->send_queue); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +hub0_pipe_send_cb(void *arg) +{ + hub0_pipe *p = arg; + hub0_sock *s = p->hub; + nni_msg *msg; + + if (nni_aio_result(&p->aio_send) != 0) { + // closed? + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); + nni_pipe_close(p->pipe); + return; + } + + nni_mtx_lock(&s->mtx); + if (nni_lmq_get(&p->send_queue, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&s->mtx); + + hub0_sched_send(s); +} + +static void +hub0_pipe_recv_cb(void *arg) +{ + hub0_pipe *p = arg; + hub0_sock *s = p->hub; + nni_aio *aio = NULL; + nni_msg *msg; + + if (nni_aio_result(&p->aio_recv) != 0) { + nni_pipe_close(p->pipe); + return; + } + + msg = nni_aio_get_msg(&p->aio_recv); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + nni_mtx_lock(&s->mtx); + + if (!nni_list_empty(&s->recv_wait)) { + aio = nni_list_first(&s->recv_wait); + nni_aio_list_remove(aio); + nni_aio_set_msg(aio, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + } else if (nni_lmq_put(&s->recv_msgs, msg) == 0) { + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pollable_raise(&s->can_recv); + } else { + p->read_ready = true; + nni_pollable_raise(&s->can_recv); + } + + if (!p->read_ready) { + hub0_pipe_recv(p); + } + + nni_mtx_unlock(&s->mtx); + + if (aio != NULL) { + nni_aio_finish_sync(aio, 0, nni_msg_len(msg)); + } +} + +static void +hub0_pipe_recv(hub0_pipe *p) +{ + nni_pipe_recv(p->pipe, &p->aio_recv); +} + +static void +hub0_recv_activate(hub0_sock *s) +{ + nni_msg *msg; + hub0_pipe *pipe; + + // Inform all pipes that we are ready to + // receive messages + NNI_LIST_FOREACH (&s->pipes, pipe) { + if (pipe->read_ready) { + msg = nni_aio_get_msg(&pipe->aio_recv); + nni_msg_set_pipe(msg, nni_pipe_id(pipe->pipe)); + + if (nni_lmq_put(&s->recv_msgs, msg) == 0) { + pipe->read_ready = false; + nni_aio_set_msg(&pipe->aio_recv, NULL); + nni_pollable_raise(&s->can_recv); + hub0_pipe_recv(pipe); + } else { + break; + } + } + } +} + +static void +hub0_sock_send(void *arg, nni_aio *aio) +{ + hub0_sock *s = arg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); + nni_aio_list_append(&s->waq, aio); + nni_mtx_unlock(&s->mtx); + + hub0_sched_send(s); +} + +static void +hub0_recv_cancel(nng_aio *aio, void *arg, int rv) +{ + hub0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + +static void +hub0_sock_recv(void *arg, nni_aio *aio) +{ + hub0_sock *s = arg; + nni_msg *msg; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); +again: + if (nni_lmq_empty(&s->recv_msgs)) { + int rv; + if ((rv = nni_aio_schedule(aio, hub0_recv_cancel, s)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&s->recv_wait, aio); + nni_mtx_unlock(&s->mtx); + return; + } + + (void) nni_lmq_get(&s->recv_msgs, &msg); + + if (nni_lmq_empty(&s->recv_msgs)) { + nni_pollable_clear(&s->can_recv); + } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + nni_aio_set_msg(aio, msg); + + hub0_recv_activate(s); + + nni_mtx_unlock(&s->mtx); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static int +hub0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t) +{ + hub0_sock *sock = arg; + int fd; + int rv; + + rv = nni_pollable_getfd(&sock->can_send, &fd); + if (rv == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); +} + +static int +hub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + hub0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); +} + +static int +hub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + hub0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = (int) nni_lmq_cap(&s->recv_msgs); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +hub0_sock_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_type t) +{ + hub0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = s->send_buf; + nni_mtx_unlock(&s->mtx); + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +hub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + hub0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + if ((rv = nni_lmq_resize(&s->recv_msgs, (size_t) val)) != 0) { + nni_mtx_unlock(&s->mtx); + return (rv); + } + + nni_mtx_unlock(&s->mtx); + return (0); +} + +static int +hub0_sock_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + hub0_sock *s = arg; + hub0_pipe *p; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&s->mtx); + s->send_buf = val; + NNI_LIST_FOREACH (&s->pipes, p) { + if ((rv = nni_lmq_resize(&p->send_queue, (size_t) val)) != 0) { + break; + } + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static nni_proto_pipe_ops hub0_pipe_ops = { + .pipe_size = sizeof(hub0_pipe), + .pipe_init = hub0_pipe_init, + .pipe_fini = hub0_pipe_fini, + .pipe_start = hub0_pipe_start, + .pipe_close = hub0_pipe_close, + .pipe_stop = hub0_pipe_stop, +}; + +static nni_option hub0_sock_options[] = { + { + .o_name = NNG_OPT_SENDFD, + .o_get = hub0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = hub0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = hub0_sock_get_recv_buf_len, + .o_set = hub0_sock_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = hub0_sock_get_send_buf_len, + .o_set = hub0_sock_set_send_buf_len, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_proto_sock_ops hub0_sock_ops = { + .sock_size = sizeof(hub0_sock), + .sock_init = hub0_sock_init, + .sock_fini = hub0_sock_fini, + .sock_open = hub0_sock_open, + .sock_close = hub0_sock_close, + .sock_send = hub0_sock_send, + .sock_recv = hub0_sock_recv, + .sock_options = hub0_sock_options, +}; + +static nni_proto hub0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_HUB_V0, "hub" }, + .proto_peer = { NNI_PROTO_HUB_V0, "hub" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_sock_ops = &hub0_sock_ops, + .proto_pipe_ops = &hub0_pipe_ops, +}; + +int +nng_hub0_open(nng_socket *id) +{ + return (nni_proto_open(id, &hub0_proto)); +} diff --git a/src/sp/protocol/hub0/hub_test.c b/src/sp/protocol/hub0/hub_test.c new file mode 100644 index 000000000..3a9d6e572 --- /dev/null +++ b/src/sp/protocol/hub0/hub_test.c @@ -0,0 +1,457 @@ +// +// Copyright 2022 Cogent Embedded, Inc. +// Copyright 2021 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +#include + +#define SECOND 1000 + +void +test_hub_identity(void) +{ + nng_socket s; + int p; + char *n; + + NUTS_PASS(nng_hub_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_HUB0_SELF); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_HUB0_PEER); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_HUB0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_HUB0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_hub_star(void) +{ + nng_socket s1, s2, s3; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_hub_open(&s2)); + NUTS_PASS(nng_hub_open(&s3)); + + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_socket_set_ms(s3, NNG_OPT_RECVTIMEO, SECOND)); + + NUTS_MARRY(s1, s2); + NUTS_MARRY(s1, s3); + + NUTS_SEND(s1, "one"); + NUTS_RECV(s2, "one"); + NUTS_RECV(s3, "one"); + + NUTS_SEND(s2, "two"); + NUTS_SEND(s1, "one"); + NUTS_RECV(s1, "two"); + NUTS_RECV(s2, "one"); + NUTS_RECV(s3, "one"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + NUTS_CLOSE(s3); +} + +static void +test_hub_compatible_pair(void) +{ + nng_socket s1, s2; + char *addr; + + NUTS_ADDR(addr, "inproc"); + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_pair0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_MARRY(s2, s1); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_hub_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_hub_open(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_hub_recv_cancel(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, SECOND); + nng_recv_aio(s1, aio); + nng_aio_abort(aio, NNG_ECANCELED); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(s1); + nng_aio_free(aio); +} + +static void +test_hub_close_recv_abort(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, SECOND); + nng_recv_aio(s1, aio); + NUTS_CLOSE(s1); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); +} + +static void +test_hub_aio_stopped(void) +{ + nng_socket s1; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_stop(aio); + + nng_recv_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + nng_aio_set_msg(aio, msg); + nng_send_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + nng_aio_free(aio); + nng_msg_free(msg); + NUTS_CLOSE(s1); +} + +static void +test_hub_send_no_pipes(void) +{ + nng_socket s1; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_SEND(s1, "DROP1"); + NUTS_SEND(s1, "DROP2"); + NUTS_CLOSE(s1); +} + +static void +test_hub_poll_readable(void) +{ + int fd; + nng_socket s1, s2; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_hub_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(s2, s1); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(s2, "abc"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(s1, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +static void +test_hub_poll_writable(void) +{ + int fd; + nng_socket s1, s2; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_hub_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not writable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_MARRY(s2, s1); + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_SEND(s1, "001"); // first one in the receiver queue + NUTS_SEND(s1, "002"); // second one in the receiver + NUTS_SEND(s1, "003"); + NUTS_SEND(s1, "004"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // and receiving makes it ready + NUTS_RECV(s2, "001"); + NUTS_SLEEP(100); // time for the sender to complete + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +static void +test_hub_recv_buf_option(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_RECVBUF; + + NUTS_PASS(nng_hub_open(&s)); + + NUTS_PASS(nng_socket_set_int(s, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s, opt, 3)); + NUTS_PASS(nng_socket_get_int(s, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(s); +} + +static void +test_hub_send_buf_option(void) +{ + nng_socket s1; + nng_socket s2; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_SENDBUF; + + NUTS_PASS(nng_hub_open(&s1)); + NUTS_PASS(nng_hub_open(&s2)); + NUTS_MARRY(s1, s2); + + NUTS_PASS(nng_socket_set_int(s1, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s1, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s1, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s1, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s1, opt, 3)); + NUTS_PASS(nng_socket_get_int(s1, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s1, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s1, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s1, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s1, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s1, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +#define SENDS 10 + +static void +test_hub_tx_drop(void) +{ + nng_socket hub1, hub2; + nng_aio **aio_array; + const char text[] = "abc"; + + + NUTS_PASS(nng_hub_open(&hub1)); + NUTS_PASS(nng_hub_open(&hub2)); + + NUTS_PASS(nng_socket_set_int(hub1, NNG_OPT_SENDBUF, 1)); + + NUTS_PASS(nng_socket_set_int(hub2, NNG_OPT_RECVBUF, SENDS + 1)); + NUTS_PASS(nng_socket_set_ms(hub2, NNG_OPT_RECVTIMEO, 10000)); + + NUTS_MARRY(hub1, hub2); + + aio_array = calloc(SENDS, sizeof(*aio_array)); + + for(unsigned i = 0u; i < SENDS; i++) { + nng_msg *msg; + void* buf; + + NUTS_PASS(nng_aio_alloc(&aio_array[i], NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, sizeof(text))); + buf = nng_msg_body(msg); + memcpy(buf, text, sizeof(text)); + nng_aio_set_msg(aio_array[i], msg); + } + + for(unsigned i = 0u; i < SENDS; i++) { + nng_send_aio(hub1, aio_array[i]); + } + + for(unsigned i = 0u; i < SENDS; i++) { + NUTS_RECV(hub2, text); + } + + for(unsigned i = 0u; i < SENDS; i++) { + nng_aio_wait(aio_array[i]); + nng_aio_stop(aio_array[i]); + nng_aio_free(aio_array[i]); + } + + NUTS_CLOSE(hub1); + NUTS_CLOSE(hub2); + free(aio_array); +} + + +static void +test_hub_rx_drop(void) +{ + nng_socket hub1, hub2; + nng_aio **aio_array; + const char text[] = "abc"; + + NUTS_PASS(nng_hub_open(&hub1)); + NUTS_PASS(nng_hub_open(&hub2)); + + NUTS_PASS(nng_socket_set_int(hub1, NNG_OPT_SENDBUF, SENDS + 1)); + + NUTS_PASS(nng_socket_set_int(hub2, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_socket_set_ms(hub2, NNG_OPT_RECVTIMEO, 10000)); + + NUTS_MARRY(hub1, hub2); + + aio_array = calloc(SENDS, sizeof(*aio_array)); + + for(unsigned i = 0u; i < SENDS; i++) { + nng_msg *msg; + void* buf; + + NUTS_PASS(nng_aio_alloc(&aio_array[i], NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, sizeof(text))); + buf = nng_msg_body(msg); + memcpy(buf, text, sizeof(text)); + nng_aio_set_msg(aio_array[i], msg); + } + + for(unsigned i = 0u; i < SENDS; i++) { + nng_send_aio(hub1, aio_array[i]); + } + + NUTS_SLEEP(100); + + for(unsigned i = 0u; i < SENDS; i++) { + NUTS_RECV(hub2, text); + } + + for(unsigned i = 0u; i < SENDS; i++) { + nng_aio_wait(aio_array[i]); + nng_aio_stop(aio_array[i]); + nng_aio_free(aio_array[i]); + } + + NUTS_CLOSE(hub1); + NUTS_CLOSE(hub2); + free(aio_array); +} + + +static void +test_hub_restart(void) +{ + nng_socket hub1, hub2; + const char text[] = "abc"; + + for (int i = 0; i < 1000; i++) { + NUTS_PASS(nng_hub_open(&hub1)); + NUTS_PASS(nng_hub_open(&hub2)); + + NUTS_MARRY(hub1, hub2); + + for(unsigned i = 0u; i < SENDS; i++) { + NUTS_SEND(hub1, text); + NUTS_RECV(hub2, text); + } + + NUTS_CLOSE(hub1); + NUTS_CLOSE(hub2); + } +} + +TEST_LIST = { + { "hub identity", test_hub_identity }, + { "hub star", test_hub_star }, + { "hub compatible pair", test_hub_compatible_pair }, + { "hub no context", test_hub_no_context }, + { "hub poll read", test_hub_poll_readable }, + { "hub poll write", test_hub_poll_writable }, + { "hub send no pipes", test_hub_send_no_pipes }, + { "hub recv cancel", test_hub_recv_cancel }, + { "hub close recv abort", test_hub_close_recv_abort }, + { "hub aio stopped", test_hub_aio_stopped }, + { "hub recv buf option", test_hub_recv_buf_option }, + { "hub send buf option", test_hub_send_buf_option }, + { "hub tx drop", test_hub_tx_drop }, + { "hub rx drop", test_hub_rx_drop }, + { "hub restart", test_hub_restart }, + { NULL, NULL }, +}; From 64f5b9910f431f4804974554a14a5c709a5c5288 Mon Sep 17 00:00:00 2001 From: Dmitry Shifrin Date: Fri, 2 Dec 2022 16:14:29 +0300 Subject: [PATCH 3/5] src: sp: tcp: Finish receive aio on close Finish receive aio on tcp pipe close --- src/sp/transport/tcp/tcp.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 624403a75..4391b5b69 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -361,6 +361,11 @@ tcptran_pipe_recv_cb(void *arg) goto recv_error; } + if (p->closed) { + rv = NNG_ECLOSED; + goto recv_error; + } + n = nni_aio_count(rxaio); nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) > 0) { From c0a0e34dfdb5db61a0604ca47f6e7b1d3719791a Mon Sep 17 00:00:00 2001 From: Andrey Vostrikov Date: Tue, 24 Jan 2023 17:54:04 +0300 Subject: [PATCH 4/5] Fix mbedTLS lookup via cmake in installed location Signed-off-by: Andrey Vostrikov --- cmake/nng-config.cmake.in | 11 ++++++++++- src/CMakeLists.txt | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cmake/nng-config.cmake.in b/cmake/nng-config.cmake.in index c54199614..adc23feff 100644 --- a/cmake/nng-config.cmake.in +++ b/cmake/nng-config.cmake.in @@ -18,7 +18,16 @@ include("${CMAKE_CURRENT_LIST_DIR}/nng-targets.cmake") # Make sure we find packages for our dependencies foreach(_PKG IN ITEMS @NNG_PKGS@) - find_package(${_PKG} REQUIRED) + if (${_PKG} STREQUAL "mbedTLS") + # Workaround to use FindmbedTLS.cmake from installed location + set(BACKUP_CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}) + get_filename_component(NNG_CONFIG_DIR "${CMAKE_CURRENT_LIST_FILE}" PATH) + set(CMAKE_MODULE_PATH ${NNG_CONFIG_DIR}) + find_package(${_PKG} REQUIRED) + set(CMAKE_MODULE_PATH ${BACKUP_CMAKE_MODULE_PATH}) + else() + find_package(${_PKG} REQUIRED) + endif() endforeach () set(NNG_LIBRARY nng::nng) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 20a7bef0c..2f9ad9573 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -94,6 +94,6 @@ configure_package_config_file(${PROJECT_SOURCE_DIR}/cmake/${PROJECT_NAME}-config INSTALL_DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" PATH_VARS INCLUDE_INSTALL_DIRS) -install(FILES "${project_config}" "${version_config}" +install(FILES "${project_config}" "${version_config}" ${PROJECT_SOURCE_DIR}/cmake/FindmbedTLS.cmake DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" COMPONENT Library) From 6273bf59139e38afcd7eb75f224e8f01944af23e Mon Sep 17 00:00:00 2001 From: Semyon Sylka Date: Fri, 7 Jun 2024 13:54:12 +0300 Subject: [PATCH 5/5] Wake all workers up on dispatch --- src/core/taskq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/taskq.c b/src/core/taskq.c index d914093bf..64224bcc8 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -170,7 +170,7 @@ nni_task_dispatch(nni_task *task) nni_mtx_lock(&tq->tq_mtx); nni_list_append(&tq->tq_tasks, task); - nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate + nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); }