From 4eb7df6c10ddc2c88f0267f50dc7bf03ce7c07be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 14 Apr 2026 18:41:39 +0200 Subject: [PATCH 1/3] Add per-request buffer_time option Commands can be sent with an optional map of request options: ered:command(Pid, Cmd, #{buffer_time => 10, timeout => 5000}). When buffer_time is non-zero, the command is buffered and a timer is started. Buffered commands are flushed when the timer fires. If an unbuffered command (buffer_time => 0, the default) arrives while there are buffered commands, everything is flushed immediately. This allows coalescing multiple commands into fewer TCP packets and TLS records. The request options map is supported in ered:command/3, ered:command_async/4, ered_cluster:command/4 and ered_cluster:command_async/5. Passing a plain timeout as before still works for backward compatibility. --- README.md | 37 ++++++++++++++++++++-- src/ered.erl | 14 +++++++-- src/ered_client.erl | 63 ++++++++++++++++++++++++++++++++------ src/ered_cluster.erl | 58 ++++++++++++++++++++--------------- test/ered_client_tests.erl | 50 ++++++++++++++++++++++++++++++ 5 files changed, 182 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 53ef8f9..3d61b39 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ Closes the connection. ```Erlang ered:command(client_ref(), command()) -> reply(). ered:command(client_ref(), command(), timeout()) -> reply(). +ered:command(client_ref(), command(), req_opts()) -> reply(). ``` Send a command and return the reply. @@ -95,10 +96,14 @@ work as expected. For cluster clients, a key must be provided. Omitting timeout is the same as setting the timeout to infinity. -### `ered:command_async/3` +The third argument can be a timeout or a map of request options. See [Request +options](#request-options) below. + +### `ered:command_async/3,4` ```Erlang ered:command_async(client_ref(), command(), fun((reply()) -> any())) -> ok. +ered:command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok. ``` Like command/2,3 but asynchronous. Instead of returning the reply, the reply @@ -142,6 +147,7 @@ cluster. ```Erlang ered_cluster:command(cluster_ref(), command(), key()) -> reply(). ered_cluster:command(cluster_ref(), command(), key(), timeout()) -> reply(). +ered_cluster:command(cluster_ref(), command(), key(), req_opts()) -> reply(). ``` Send a command. The command is routed to @@ -154,10 +160,14 @@ then they need to all map to the same slot for things to work as expected. Omitting timeout is the same as setting the timeout to infinity. -### `ered_cluster:command_async/4` +The fourth argument can be a timeout or a map of request options. See [Request +options](#request-options) below. + +### `ered_cluster:command_async/4,5` ```Erlang ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. +ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), req_opts()) -> ok. ``` Like command/3,4 but asynchronous. Instead of returning the reply, the reply @@ -370,6 +380,29 @@ options, as `{client_opts, [{connection_opts, [...]}]}`. When a timeout happens, the connection is closed and the client attempts to set up a new connection. See the client option `node_down_timeout` above. +Request options +--------------- + +The command functions accept an optional map of request options instead of a +plain timeout. The following keys are recognized: + +* `timeout => timeout()` + + Timeout for the `gen_server:call`. Same as passing a timeout directly. + Default infinity. + +* `buffer_time => non_neg_integer()` + + Buffer time in milliseconds. When non-zero, the command is not sent + immediately but buffered. A timer is started and the buffered commands are + flushed when the timer fires. If a command with `buffer_time => 0` (the + default) arrives while there are buffered commands, all buffered commands are + flushed immediately together with the new command. This can be used to + coalesce multiple commands into fewer TCP packets and TLS records. + + If multiple buffered commands arrive with different buffer times, the shortest + remaining time is used. + Info messages ------------- diff --git a/src/ered.erl b/src/ered.erl index 0510908..4bd7d92 100644 --- a/src/ered.erl +++ b/src/ered.erl @@ -7,7 +7,7 @@ -export([connect/3, close/1, command/2, command/3, - command_async/3]). + command_async/3, command_async/4]). -export_type([opt/0, addr/0, @@ -15,6 +15,7 @@ command/0, reply/0, reply_fun/0, + req_opts/0, client_ref/0]). %%%=================================================================== @@ -27,6 +28,7 @@ -type command() :: ered_command:command(). -type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. -type reply_fun() :: ered_client:reply_fun(). +-type req_opts() :: #{timeout => timeout(), buffer_time => non_neg_integer()}. -type client_ref() :: gen_server:server_ref(). %%%=================================================================== @@ -51,7 +53,7 @@ close(Pid) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(client_ref(), command()) -> reply(). --spec command(client_ref(), command(), timeout()) -> reply(). +-spec command(client_ref(), command(), timeout() | req_opts()) -> reply(). %% %% Send a command. %% If the command is a single command then it is represented as a @@ -64,10 +66,13 @@ close(Pid) -> command(Pid, Command) -> ered_client:command(Pid, Command, infinity). command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> - ered_client:command(Pid, Command, Timeout). + ered_client:command(Pid, Command, Timeout); +command(Pid, Command, Opts) when is_map(Opts) -> + ered_client:command(Pid, Command, Opts). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(client_ref(), command(), fun((reply()) -> any())) -> ok. +-spec command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok. %% %% Like command/2,3 but asynchronous. Instead of returning the reply, %% the reply function is applied to the reply when it is available. @@ -76,3 +81,6 @@ command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(Pid, Command, ReplyFun) when is_function(ReplyFun, 1) -> ered_client:command_async(Pid, Command, ReplyFun). + +command_async(Pid, Command, ReplyFun, Opts) when is_function(ReplyFun, 1), is_map(Opts) -> + ered_client:command_async(Pid, Command, ReplyFun, Opts). diff --git a/src/ered_client.erl b/src/ered_client.erl index 4045482..031832f 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -13,7 +13,7 @@ connect/3, close/1, deactivate/1, reactivate/1, command/2, command/3, - command_async/3]). + command_async/3, command_async/4]). %% testing/debugging -export([state_to_map/1]). @@ -83,6 +83,8 @@ status :: init | up | node_down | node_deactivated, node_down_timer = none :: none | reference(), connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond) + buffer_until = none :: none | integer(), % erlang:monotonic_time(millisecond) + buffer_timer = none :: none | reference(), opts = #opts{} }). @@ -258,7 +260,7 @@ reactivate(ServerRef) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(pid(), ered_command:command()) -> reply(). --spec command(pid(), ered_command:command(), timeout()) -> reply(). +-spec command(pid(), ered_command:command(), timeout() | ered:req_opts()) -> reply(). %% %% Send a command to the connected node. The argument can be a %% single command as a list of binaries, a pipeline of command as a @@ -267,11 +269,16 @@ reactivate(ServerRef) -> command(ServerRef, Command) -> command(ServerRef, Command, infinity). -command(ServerRef, Command, Timeout) -> - gen_server:call(ServerRef, {command, ered_command:convert_to(Command)}, Timeout). +command(ServerRef, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> + gen_server:call(ServerRef, {command, ered_command:convert_to(Command), 0}, Timeout); +command(ServerRef, Command, Opts) when is_map(Opts) -> + Timeout = maps:get(timeout, Opts, infinity), + BufferTime = maps:get(buffer_time, Opts, 0), + gen_server:call(ServerRef, {command, ered_command:convert_to(Command), BufferTime}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(pid(), ered_command:command(), reply_fun()) -> ok. +-spec command_async(pid(), ered_command:command(), reply_fun(), ered:req_opts()) -> ok. %% %% Send a command to the connected node in asynchronous %% fashion. The provided callback function will be called with the @@ -279,8 +286,13 @@ command(ServerRef, Command, Timeout) -> %% client process and should not hang or perform any lengthy task. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, CallbackFun) -> - gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), - replyto = CallbackFun}). + command_async(ServerRef, Command, CallbackFun, #{}). + +command_async(ServerRef, Command, CallbackFun, Opts) when is_map(Opts) -> + BufferTime = maps:get(buffer_time, Opts, 0), + gen_server:cast(ServerRef, {#command{data = ered_command:convert_to(Command), + replyto = CallbackFun}, + BufferTime}). %% Converts a state record to a map, for easier testing. %% Used in tests, after calling sys:get_state(EredClientPid). @@ -347,16 +359,16 @@ handle_connection_opts(OptsRecord, Opts) -> timeout = ResponseTimeout, push_cb = PushCb}. -handle_call({command, Command}, From, State) -> +handle_call({command, Command, BufferTime}, From, State) -> Fun = fun(Reply) -> gen_server:reply(From, Reply) end, - handle_cast(#command{data = Command, replyto = Fun}, State). + handle_cast({#command{data = Command, replyto = Fun}, BufferTime}, State). -handle_cast(Command = #command{}, State) -> +handle_cast({Command = #command{}, BufferTime}, State) -> case State#st.status of Up when Up =:= up; Up =:= init -> State1 = State#st{waiting = q_in(Command, State#st.waiting)}, - State2 = process_commands(State1), + State2 = maybe_buffer(BufferTime, State1), {noreply, State2, response_timeout(State2)}; NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated -> reply_command(Command, {error, NodeProblem}), @@ -456,6 +468,11 @@ handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.nod State2 = reply_all({error, node_down}, State1), {noreply, process_commands(State2#st{status = node_down})}; +handle_info({timeout, Ref, flush_buffer}, #st{buffer_timer = Ref} = State) -> + State1 = State#st{buffer_timer = none, buffer_until = none}, + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; + handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none -> %% Request timeout Transport = State#st.opts#opts.transport, @@ -807,6 +824,32 @@ response_timeout(State) when not ?q_is_empty(State#st.pending) -> response_timeout(_State) -> infinity. +maybe_buffer(0, State) -> + cancel_buffer_timer(process_commands(State)); +maybe_buffer(BufferTime, State) -> + Now = erlang:monotonic_time(millisecond), + Until = Now + BufferTime, + case State#st.buffer_until of + none -> + start_buffer_timer(Until, State); + Existing when Until < Existing -> + erlang:cancel_timer(State#st.buffer_timer), + start_buffer_timer(Until, State); + _Later -> + State + end. + +start_buffer_timer(Until, State) -> + Ms = max(1, Until - erlang:monotonic_time(millisecond)), + Ref = erlang:start_timer(Ms, self(), flush_buffer), + State#st{buffer_timer = Ref, buffer_until = Until}. + +cancel_buffer_timer(#st{buffer_timer = none} = State) -> + State; +cancel_buffer_timer(#st{buffer_timer = Ref} = State) -> + erlang:cancel_timer(Ref), + State#st{buffer_timer = none, buffer_until = none}. + reply_command(#command{replyto = Fun} = _Command, Reply) -> Fun(Reply). diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 5e4139c..574e346 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -9,7 +9,7 @@ %% API -export([connect/2, close/1, - command/3, command/4, command_async/4, + command/3, command/4, command_async/4, command_async/5, command_all/2, command_all/3, get_clients/1, get_addr_to_client_map/1, @@ -176,7 +176,7 @@ close(ClusterRef) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(cluster_ref(), command(), key()) -> reply(). --spec command(cluster_ref(), command(), key(), timeout()) -> reply(). +-spec command(cluster_ref(), command(), key(), timeout() | ered:req_opts()) -> reply(). %% %% Send a command to the cluster. The command will be routed to %% the correct cluster node client based on the provided key. @@ -191,20 +191,28 @@ close(ClusterRef) -> command(ClusterRef, Command, Key) -> command(ClusterRef, Command, Key, infinity). -command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> +command(ClusterRef, Command, Key, Opts) when is_map(Opts) -> C = ered_command:convert_to(Command), - gen_server:call(ClusterRef, {command, C, Key}, Timeout). + Timeout = maps:get(timeout, Opts, infinity), + gen_server:call(ClusterRef, {command, C, Key, Opts}, Timeout); +command(ClusterRef, Command, Key, Timeout) -> + C = ered_command:convert_to(Command), + gen_server:call(ClusterRef, {command, C, Key, #{}}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. +-spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), ered:req_opts()) -> ok. %% %% Like command/4 but asynchronous. Instead of returning the reply, the reply %% function is applied to the reply when it is available. The reply function %% runs in an unspecified process. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> + command_async(ServerRef, Command, Key, ReplyFun, #{}). + +command_async(ServerRef, Command, Key, ReplyFun, Opts) when is_function(ReplyFun, 1), is_map(Opts) -> C = ered_command:convert_to(Command), - gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun, Opts}). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_all(cluster_ref(), command()) -> [reply()]. @@ -317,9 +325,9 @@ init({Addrs, Opts, ClientSup, User}) -> process_flag(trap_exit, true), {ok, start_clients(Addrs, State)}. -handle_call({command, Command, Key}, From, State) -> +handle_call({command, Command, Key, Opts}, From, State) -> Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, From, State, State#st.redirect_attempts), + State1 = send_command_to_slot(Command, Slot, From, Opts, State, State#st.redirect_attempts), {noreply, State1}; handle_call(get_clients, _From, State) -> @@ -334,26 +342,26 @@ handle_call({connect_node, Addr}, _From, State) -> ClientPid = maps:get(Addr, State1#st.nodes), {reply, ClientPid, State1}. -handle_cast({command_async, Command, Key, ReplyFun}, State) -> +handle_cast({command_async, Command, Key, ReplyFun, Opts}, State) -> Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, ReplyFun, State, State#st.redirect_attempts), + State1 = send_command_to_slot(Command, Slot, ReplyFun, Opts, State, State#st.redirect_attempts), {noreply, State1}; handle_cast({replied, To}, State) -> {noreply, State#st{pending_commands = maps:remove(To, State#st.pending_commands)}}; -handle_cast({forward_command, Command, Slot, From, Addr, AttemptsLeft}, State) -> +handle_cast({forward_command, Command, Slot, From, Opts, Addr, AttemptsLeft}, State) -> {Client, State1} = connect_addr(Addr, State), - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), + Fun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun, Opts), {noreply, State1}; -handle_cast({forward_command_asking, Command, Slot, From, Addr, AttemptsLeft, OldReply}, State) -> +handle_cast({forward_command_asking, Command, Slot, From, Opts, Addr, AttemptsLeft, OldReply}, State) -> {Client, State1} = connect_addr(Addr, State), Command1 = ered_command:add_asking(OldReply, Command), - HandleReplyFun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), + HandleReplyFun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), Fun = fun(Reply) -> HandleReplyFun(ered_command:fix_ask_reply(OldReply, Reply)) end, - ered_client:command_async(Client, Command1, Fun), + ered_client:command_async(Client, Command1, Fun, Opts), {noreply, State1}; handle_cast({trigger_map_update, SlotMapVersion, Node}, State) @@ -373,8 +381,8 @@ handle_cast({trigger_map_update, SlotMapVersion, Node}, State) handle_cast({trigger_map_update, _SlotMapVersion, _Node}, State) -> {noreply, State}. -handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) -> - State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft), +handle_info({command_try_again, Command, Slot, From, Opts, AttemptsLeft}, State) -> + State1 = send_command_to_slot(Command, Slot, From, Opts, State, AttemptsLeft), {noreply, State1}; handle_info(Msg = #{msg_type := MsgType, client_id := _Pid, addr := Addr}, State) -> @@ -621,15 +629,15 @@ new_set(List) -> %%% Command handling %%%------------------------------------------------------------------- -send_command_to_slot(Command, Slot, From, State, AttemptsLeft) -> +send_command_to_slot(Command, Slot, From, Opts, State, AttemptsLeft) -> case binary:at(State#st.slots, Slot) of 0 -> reply(From, {error, unmapped_slot}, none), State; Ix -> Client = element(Ix, State#st.clients), - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), + Fun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun, Opts), put_pending_command(From, Client, State) end. @@ -640,10 +648,10 @@ put_pending_command(ReplyFun, _Client, State) when is_function(ReplyFun) -> %% Cast with reply fun. We don't keep track of those. State. -create_reply_fun(_Command, _Slot, _Client, From, _State, 0) -> +create_reply_fun(_Command, _Slot, _Client, From, _Opts, _State, 0) -> Pid = self(), fun(Reply) -> reply(From, Reply, Pid) end; -create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> +create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft) -> Pid = self(), %% Avoid binding the #st record inside the fun since the fun will be %% copied to another process @@ -655,11 +663,11 @@ create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> reply(From, Reply, Pid); {moved, Addr} -> update_slots(Pid, SlotMapVersion, Client), - gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); + gen_server:cast(Pid, {forward_command, Command, Slot, From, Opts, Addr, AttemptsLeft-1}); {ask, Addr} -> - gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); + gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Opts, Addr, AttemptsLeft-1, Reply}); try_again -> - erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); + erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, Opts, AttemptsLeft-1}); cluster_down -> update_slots(Pid, SlotMapVersion, Client), reply(From, Reply, Pid) diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 6ff824b..267a874 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -17,6 +17,8 @@ run_test_() -> {spawn, fun server_buffer_full_reconnect_t/0}, {spawn, fun server_buffer_full_node_goes_down_t/0}, {spawn, fun response_timeout_t/0}, + {spawn, fun buffer_time_t/0}, + {spawn, fun buffer_time_flush_on_unbuffered_t/0}, {spawn, fun send_backoff_tcp_t/0}, {spawn, fun send_backoff_tls_t/0}, {spawn, fun fail_hello_t/0}, @@ -303,6 +305,54 @@ response_timeout_t() -> {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). +buffer_time_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% Both pings should arrive in a single recv + {ok, <<"*1\r\n$4\r\nping\r\n" + "*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, <<"+pong\r\n+pong\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + Pid = self(), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end, + #{buffer_time => 100}), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end, + #{buffer_time => 100}), + {1, {ok, <<"pong">>}} = get_msg(), + {2, {ok, <<"pong">>}} = get_msg(), + no_more_msgs(). + +buffer_time_flush_on_unbuffered_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% Buffered and unbuffered should arrive together + {ok, <<"*1\r\n$4\r\nping\r\n" + "*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, <<"+pong\r\n+pong\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + Pid = self(), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end, + #{buffer_time => 5000}), + %% Unbuffered command should flush the buffered one immediately + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end), + {1, {ok, <<"pong">>}} = get_msg(1000), + {2, {ok, <<"pong">>}} = get_msg(1000), + no_more_msgs(). + send_backoff_tcp_t() -> send_backoff_t(gen_tcp). From 2fa4f9988f4f75142d528196adeba3f2f3db434a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 20 May 2026 11:34:50 +0200 Subject: [PATCH 2/3] Prevent replies from flushing buffered requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/ered_client.erl | 5 +++- test/ered_client_tests.erl | 51 +++++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/ered_client.erl b/src/ered_client.erl index 031832f..8b546e2 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -391,7 +391,10 @@ handle_info({Type, Socket, Data}, #st{socket = Socket} = State) when Type =:= tcp; Type =:= ssl -> %% Receive data from current socket. State1 = handle_data(Data, State), - State2 = process_commands(State1), + State2 = case State1#st.buffer_timer of + none -> process_commands(State1); + _Ref -> State1 + end, {noreply, State2, response_timeout(State2)}; handle_info({Passive, Socket}, #st{socket = Socket} = State) diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 267a874..4612fdd 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -19,6 +19,7 @@ run_test_() -> {spawn, fun response_timeout_t/0}, {spawn, fun buffer_time_t/0}, {spawn, fun buffer_time_flush_on_unbuffered_t/0}, + {spawn, fun buffer_time_not_flushed_by_reply_t/0}, {spawn, fun send_backoff_tcp_t/0}, {spawn, fun send_backoff_tls_t/0}, {spawn, fun fail_hello_t/0}, @@ -330,7 +331,7 @@ buffer_time_t() -> no_more_msgs(). buffer_time_flush_on_unbuffered_t() -> - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> {ok, Sock} = gen_tcp:accept(ListenSock), @@ -353,6 +354,54 @@ buffer_time_flush_on_unbuffered_t() -> {2, {ok, <<"pong">>}} = get_msg(1000), no_more_msgs(). +buffer_time_not_flushed_by_reply_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(ListenSock), + Ping = <<"*1\r\n$4\r\nping\r\n">>, + Pid = self(), + ServerPid = spawn_link( + fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% First: the unbuffered ping arrives + {ok, Ping} = gen_tcp:recv(Sock, size(Ping)), + %% Wait for the test to send the buffered command + receive send_reply -> ok end, + %% Reply to it; this should NOT flush the buffered command + ok = gen_tcp:send(Sock, <<"+pong1\r\n">>), + %% Nothing more should arrive yet (buffered cmd still waiting) + {error, timeout} = gen_tcp:recv(Sock, 0, 100), + Pid ! server_recv_timeout, + %% The third (unbuffered) ping flushes the buffered one + TwoPings = <>, + {ok, TwoPings} = gen_tcp:recv(Sock, size(TwoPings)), + ok = gen_tcp:send(Sock, <<"+pong2\r\n+pong3\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + %% Send an unbuffered command; it is sent immediately + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end), + %% Send a buffered command; it stays in the waiting queue + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end, + #{buffer_time => 60000}), + %% Sync call to ensure both casts have been processed, so the + %% buffered command is in the waiting queue when the reply arrives. + _ = sys:get_state(Client), + %% Now let the server reply; the reply must not flush the buffered command + ServerPid ! send_reply, + %% Receive reply to first command + {1, {ok, <<"pong1">>}} = get_msg(1000), + %% Wait for the server to confirm nothing else was received + server_recv_timeout = get_msg(2000), + %% Send an unbuffered command to flush the buffer + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {3, Reply} end), + {2, {ok, <<"pong2">>}} = get_msg(1000), + {3, {ok, <<"pong3">>}} = get_msg(1000), + no_more_msgs(). + send_backoff_tcp_t() -> send_backoff_t(gen_tcp). From 9ea72689d46fc1888493cc1baa72812a2709456a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 26 May 2026 11:32:15 +0200 Subject: [PATCH 3/3] Address review comment about exported type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/ered_cluster.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 574e346..17bf906 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -24,11 +24,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/1]). - -export_type([cluster_ref/0, client_ref/0, opt/0, addr/0, + req_opts/0, command/0, reply/0, key/0]). @@ -109,6 +109,7 @@ -type addr_set() :: sets:set(addr()). -type cluster_ref() :: gen_server:server_ref(). -type client_ref() :: pid(). +-type req_opts() :: ered:req_opts(). -type command() :: ered_command:command(). -type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. -type key() :: binary(). @@ -176,7 +177,7 @@ close(ClusterRef) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(cluster_ref(), command(), key()) -> reply(). --spec command(cluster_ref(), command(), key(), timeout() | ered:req_opts()) -> reply(). +-spec command(cluster_ref(), command(), key(), timeout() | req_opts()) -> reply(). %% %% Send a command to the cluster. The command will be routed to %% the correct cluster node client based on the provided key. @@ -201,7 +202,7 @@ command(ClusterRef, Command, Key, Timeout) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. --spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), ered:req_opts()) -> ok. +-spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), req_opts()) -> ok. %% %% Like command/4 but asynchronous. Instead of returning the reply, the reply %% function is applied to the reply when it is available. The reply function