diff --git a/README.md b/README.md index 53ef8f9..3d61b39 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ Closes the connection. ```Erlang ered:command(client_ref(), command()) -> reply(). ered:command(client_ref(), command(), timeout()) -> reply(). +ered:command(client_ref(), command(), req_opts()) -> reply(). ``` Send a command and return the reply. @@ -95,10 +96,14 @@ work as expected. For cluster clients, a key must be provided. Omitting timeout is the same as setting the timeout to infinity. -### `ered:command_async/3` +The third argument can be a timeout or a map of request options. See [Request +options](#request-options) below. + +### `ered:command_async/3,4` ```Erlang ered:command_async(client_ref(), command(), fun((reply()) -> any())) -> ok. +ered:command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok. ``` Like command/2,3 but asynchronous. Instead of returning the reply, the reply @@ -142,6 +147,7 @@ cluster. ```Erlang ered_cluster:command(cluster_ref(), command(), key()) -> reply(). ered_cluster:command(cluster_ref(), command(), key(), timeout()) -> reply(). +ered_cluster:command(cluster_ref(), command(), key(), req_opts()) -> reply(). ``` Send a command. The command is routed to @@ -154,10 +160,14 @@ then they need to all map to the same slot for things to work as expected. Omitting timeout is the same as setting the timeout to infinity. -### `ered_cluster:command_async/4` +The fourth argument can be a timeout or a map of request options. See [Request +options](#request-options) below. + +### `ered_cluster:command_async/4,5` ```Erlang ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. +ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), req_opts()) -> ok. ``` Like command/3,4 but asynchronous. Instead of returning the reply, the reply @@ -370,6 +380,29 @@ options, as `{client_opts, [{connection_opts, [...]}]}`. When a timeout happens, the connection is closed and the client attempts to set up a new connection. See the client option `node_down_timeout` above. +Request options +--------------- + +The command functions accept an optional map of request options instead of a +plain timeout. The following keys are recognized: + +* `timeout => timeout()` + + Timeout for the `gen_server:call`. Same as passing a timeout directly. + Default infinity. + +* `buffer_time => non_neg_integer()` + + Buffer time in milliseconds. When non-zero, the command is not sent + immediately but buffered. A timer is started and the buffered commands are + flushed when the timer fires. If a command with `buffer_time => 0` (the + default) arrives while there are buffered commands, all buffered commands are + flushed immediately together with the new command. This can be used to + coalesce multiple commands into fewer TCP packets and TLS records. + + If multiple buffered commands arrive with different buffer times, the shortest + remaining time is used. + Info messages ------------- diff --git a/src/ered.erl b/src/ered.erl index 0510908..4bd7d92 100644 --- a/src/ered.erl +++ b/src/ered.erl @@ -7,7 +7,7 @@ -export([connect/3, close/1, command/2, command/3, - command_async/3]). + command_async/3, command_async/4]). -export_type([opt/0, addr/0, @@ -15,6 +15,7 @@ command/0, reply/0, reply_fun/0, + req_opts/0, client_ref/0]). %%%=================================================================== @@ -27,6 +28,7 @@ -type command() :: ered_command:command(). -type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. -type reply_fun() :: ered_client:reply_fun(). +-type req_opts() :: #{timeout => timeout(), buffer_time => non_neg_integer()}. -type client_ref() :: gen_server:server_ref(). %%%=================================================================== @@ -51,7 +53,7 @@ close(Pid) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(client_ref(), command()) -> reply(). --spec command(client_ref(), command(), timeout()) -> reply(). +-spec command(client_ref(), command(), timeout() | req_opts()) -> reply(). %% %% Send a command. %% If the command is a single command then it is represented as a @@ -64,10 +66,13 @@ close(Pid) -> command(Pid, Command) -> ered_client:command(Pid, Command, infinity). command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> - ered_client:command(Pid, Command, Timeout). + ered_client:command(Pid, Command, Timeout); +command(Pid, Command, Opts) when is_map(Opts) -> + ered_client:command(Pid, Command, Opts). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(client_ref(), command(), fun((reply()) -> any())) -> ok. +-spec command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok. %% %% Like command/2,3 but asynchronous. Instead of returning the reply, %% the reply function is applied to the reply when it is available. @@ -76,3 +81,6 @@ command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(Pid, Command, ReplyFun) when is_function(ReplyFun, 1) -> ered_client:command_async(Pid, Command, ReplyFun). + +command_async(Pid, Command, ReplyFun, Opts) when is_function(ReplyFun, 1), is_map(Opts) -> + ered_client:command_async(Pid, Command, ReplyFun, Opts). diff --git a/src/ered_client.erl b/src/ered_client.erl index 4045482..8b546e2 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -13,7 +13,7 @@ connect/3, close/1, deactivate/1, reactivate/1, command/2, command/3, - command_async/3]). + command_async/3, command_async/4]). %% testing/debugging -export([state_to_map/1]). @@ -83,6 +83,8 @@ status :: init | up | node_down | node_deactivated, node_down_timer = none :: none | reference(), connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond) + buffer_until = none :: none | integer(), % erlang:monotonic_time(millisecond) + buffer_timer = none :: none | reference(), opts = #opts{} }). @@ -258,7 +260,7 @@ reactivate(ServerRef) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(pid(), ered_command:command()) -> reply(). --spec command(pid(), ered_command:command(), timeout()) -> reply(). +-spec command(pid(), ered_command:command(), timeout() | ered:req_opts()) -> reply(). %% %% Send a command to the connected node. The argument can be a %% single command as a list of binaries, a pipeline of command as a @@ -267,11 +269,16 @@ reactivate(ServerRef) -> command(ServerRef, Command) -> command(ServerRef, Command, infinity). -command(ServerRef, Command, Timeout) -> - gen_server:call(ServerRef, {command, ered_command:convert_to(Command)}, Timeout). +command(ServerRef, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity -> + gen_server:call(ServerRef, {command, ered_command:convert_to(Command), 0}, Timeout); +command(ServerRef, Command, Opts) when is_map(Opts) -> + Timeout = maps:get(timeout, Opts, infinity), + BufferTime = maps:get(buffer_time, Opts, 0), + gen_server:call(ServerRef, {command, ered_command:convert_to(Command), BufferTime}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(pid(), ered_command:command(), reply_fun()) -> ok. +-spec command_async(pid(), ered_command:command(), reply_fun(), ered:req_opts()) -> ok. %% %% Send a command to the connected node in asynchronous %% fashion. The provided callback function will be called with the @@ -279,8 +286,13 @@ command(ServerRef, Command, Timeout) -> %% client process and should not hang or perform any lengthy task. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, CallbackFun) -> - gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), - replyto = CallbackFun}). + command_async(ServerRef, Command, CallbackFun, #{}). + +command_async(ServerRef, Command, CallbackFun, Opts) when is_map(Opts) -> + BufferTime = maps:get(buffer_time, Opts, 0), + gen_server:cast(ServerRef, {#command{data = ered_command:convert_to(Command), + replyto = CallbackFun}, + BufferTime}). %% Converts a state record to a map, for easier testing. %% Used in tests, after calling sys:get_state(EredClientPid). @@ -347,16 +359,16 @@ handle_connection_opts(OptsRecord, Opts) -> timeout = ResponseTimeout, push_cb = PushCb}. -handle_call({command, Command}, From, State) -> +handle_call({command, Command, BufferTime}, From, State) -> Fun = fun(Reply) -> gen_server:reply(From, Reply) end, - handle_cast(#command{data = Command, replyto = Fun}, State). + handle_cast({#command{data = Command, replyto = Fun}, BufferTime}, State). -handle_cast(Command = #command{}, State) -> +handle_cast({Command = #command{}, BufferTime}, State) -> case State#st.status of Up when Up =:= up; Up =:= init -> State1 = State#st{waiting = q_in(Command, State#st.waiting)}, - State2 = process_commands(State1), + State2 = maybe_buffer(BufferTime, State1), {noreply, State2, response_timeout(State2)}; NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated -> reply_command(Command, {error, NodeProblem}), @@ -379,7 +391,10 @@ handle_info({Type, Socket, Data}, #st{socket = Socket} = State) when Type =:= tcp; Type =:= ssl -> %% Receive data from current socket. State1 = handle_data(Data, State), - State2 = process_commands(State1), + State2 = case State1#st.buffer_timer of + none -> process_commands(State1); + _Ref -> State1 + end, {noreply, State2, response_timeout(State2)}; handle_info({Passive, Socket}, #st{socket = Socket} = State) @@ -456,6 +471,11 @@ handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.nod State2 = reply_all({error, node_down}, State1), {noreply, process_commands(State2#st{status = node_down})}; +handle_info({timeout, Ref, flush_buffer}, #st{buffer_timer = Ref} = State) -> + State1 = State#st{buffer_timer = none, buffer_until = none}, + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; + handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none -> %% Request timeout Transport = State#st.opts#opts.transport, @@ -807,6 +827,32 @@ response_timeout(State) when not ?q_is_empty(State#st.pending) -> response_timeout(_State) -> infinity. +maybe_buffer(0, State) -> + cancel_buffer_timer(process_commands(State)); +maybe_buffer(BufferTime, State) -> + Now = erlang:monotonic_time(millisecond), + Until = Now + BufferTime, + case State#st.buffer_until of + none -> + start_buffer_timer(Until, State); + Existing when Until < Existing -> + erlang:cancel_timer(State#st.buffer_timer), + start_buffer_timer(Until, State); + _Later -> + State + end. + +start_buffer_timer(Until, State) -> + Ms = max(1, Until - erlang:monotonic_time(millisecond)), + Ref = erlang:start_timer(Ms, self(), flush_buffer), + State#st{buffer_timer = Ref, buffer_until = Until}. + +cancel_buffer_timer(#st{buffer_timer = none} = State) -> + State; +cancel_buffer_timer(#st{buffer_timer = Ref} = State) -> + erlang:cancel_timer(Ref), + State#st{buffer_timer = none, buffer_until = none}. + reply_command(#command{replyto = Fun} = _Command, Reply) -> Fun(Reply). diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 5e4139c..17bf906 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -9,7 +9,7 @@ %% API -export([connect/2, close/1, - command/3, command/4, command_async/4, + command/3, command/4, command_async/4, command_async/5, command_all/2, command_all/3, get_clients/1, get_addr_to_client_map/1, @@ -24,11 +24,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/1]). - -export_type([cluster_ref/0, client_ref/0, opt/0, addr/0, + req_opts/0, command/0, reply/0, key/0]). @@ -109,6 +109,7 @@ -type addr_set() :: sets:set(addr()). -type cluster_ref() :: gen_server:server_ref(). -type client_ref() :: pid(). +-type req_opts() :: ered:req_opts(). -type command() :: ered_command:command(). -type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. -type key() :: binary(). @@ -176,7 +177,7 @@ close(ClusterRef) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(cluster_ref(), command(), key()) -> reply(). --spec command(cluster_ref(), command(), key(), timeout()) -> reply(). +-spec command(cluster_ref(), command(), key(), timeout() | req_opts()) -> reply(). %% %% Send a command to the cluster. The command will be routed to %% the correct cluster node client based on the provided key. @@ -191,20 +192,28 @@ close(ClusterRef) -> command(ClusterRef, Command, Key) -> command(ClusterRef, Command, Key, infinity). -command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> +command(ClusterRef, Command, Key, Opts) when is_map(Opts) -> + C = ered_command:convert_to(Command), + Timeout = maps:get(timeout, Opts, infinity), + gen_server:call(ClusterRef, {command, C, Key, Opts}, Timeout); +command(ClusterRef, Command, Key, Timeout) -> C = ered_command:convert_to(Command), - gen_server:call(ClusterRef, {command, C, Key}, Timeout). + gen_server:call(ClusterRef, {command, C, Key, #{}}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. +-spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), req_opts()) -> ok. %% %% Like command/4 but asynchronous. Instead of returning the reply, the reply %% function is applied to the reply when it is available. The reply function %% runs in an unspecified process. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> + command_async(ServerRef, Command, Key, ReplyFun, #{}). + +command_async(ServerRef, Command, Key, ReplyFun, Opts) when is_function(ReplyFun, 1), is_map(Opts) -> C = ered_command:convert_to(Command), - gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun, Opts}). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_all(cluster_ref(), command()) -> [reply()]. @@ -317,9 +326,9 @@ init({Addrs, Opts, ClientSup, User}) -> process_flag(trap_exit, true), {ok, start_clients(Addrs, State)}. -handle_call({command, Command, Key}, From, State) -> +handle_call({command, Command, Key, Opts}, From, State) -> Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, From, State, State#st.redirect_attempts), + State1 = send_command_to_slot(Command, Slot, From, Opts, State, State#st.redirect_attempts), {noreply, State1}; handle_call(get_clients, _From, State) -> @@ -334,26 +343,26 @@ handle_call({connect_node, Addr}, _From, State) -> ClientPid = maps:get(Addr, State1#st.nodes), {reply, ClientPid, State1}. -handle_cast({command_async, Command, Key, ReplyFun}, State) -> +handle_cast({command_async, Command, Key, ReplyFun, Opts}, State) -> Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, ReplyFun, State, State#st.redirect_attempts), + State1 = send_command_to_slot(Command, Slot, ReplyFun, Opts, State, State#st.redirect_attempts), {noreply, State1}; handle_cast({replied, To}, State) -> {noreply, State#st{pending_commands = maps:remove(To, State#st.pending_commands)}}; -handle_cast({forward_command, Command, Slot, From, Addr, AttemptsLeft}, State) -> +handle_cast({forward_command, Command, Slot, From, Opts, Addr, AttemptsLeft}, State) -> {Client, State1} = connect_addr(Addr, State), - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), + Fun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun, Opts), {noreply, State1}; -handle_cast({forward_command_asking, Command, Slot, From, Addr, AttemptsLeft, OldReply}, State) -> +handle_cast({forward_command_asking, Command, Slot, From, Opts, Addr, AttemptsLeft, OldReply}, State) -> {Client, State1} = connect_addr(Addr, State), Command1 = ered_command:add_asking(OldReply, Command), - HandleReplyFun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), + HandleReplyFun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), Fun = fun(Reply) -> HandleReplyFun(ered_command:fix_ask_reply(OldReply, Reply)) end, - ered_client:command_async(Client, Command1, Fun), + ered_client:command_async(Client, Command1, Fun, Opts), {noreply, State1}; handle_cast({trigger_map_update, SlotMapVersion, Node}, State) @@ -373,8 +382,8 @@ handle_cast({trigger_map_update, SlotMapVersion, Node}, State) handle_cast({trigger_map_update, _SlotMapVersion, _Node}, State) -> {noreply, State}. -handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) -> - State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft), +handle_info({command_try_again, Command, Slot, From, Opts, AttemptsLeft}, State) -> + State1 = send_command_to_slot(Command, Slot, From, Opts, State, AttemptsLeft), {noreply, State1}; handle_info(Msg = #{msg_type := MsgType, client_id := _Pid, addr := Addr}, State) -> @@ -621,15 +630,15 @@ new_set(List) -> %%% Command handling %%%------------------------------------------------------------------- -send_command_to_slot(Command, Slot, From, State, AttemptsLeft) -> +send_command_to_slot(Command, Slot, From, Opts, State, AttemptsLeft) -> case binary:at(State#st.slots, Slot) of 0 -> reply(From, {error, unmapped_slot}, none), State; Ix -> Client = element(Ix, State#st.clients), - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), + Fun = create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun, Opts), put_pending_command(From, Client, State) end. @@ -640,10 +649,10 @@ put_pending_command(ReplyFun, _Client, State) when is_function(ReplyFun) -> %% Cast with reply fun. We don't keep track of those. State. -create_reply_fun(_Command, _Slot, _Client, From, _State, 0) -> +create_reply_fun(_Command, _Slot, _Client, From, _Opts, _State, 0) -> Pid = self(), fun(Reply) -> reply(From, Reply, Pid) end; -create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> +create_reply_fun(Command, Slot, Client, From, Opts, State, AttemptsLeft) -> Pid = self(), %% Avoid binding the #st record inside the fun since the fun will be %% copied to another process @@ -655,11 +664,11 @@ create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> reply(From, Reply, Pid); {moved, Addr} -> update_slots(Pid, SlotMapVersion, Client), - gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); + gen_server:cast(Pid, {forward_command, Command, Slot, From, Opts, Addr, AttemptsLeft-1}); {ask, Addr} -> - gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); + gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Opts, Addr, AttemptsLeft-1, Reply}); try_again -> - erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); + erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, Opts, AttemptsLeft-1}); cluster_down -> update_slots(Pid, SlotMapVersion, Client), reply(From, Reply, Pid) diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 6ff824b..4612fdd 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -17,6 +17,9 @@ run_test_() -> {spawn, fun server_buffer_full_reconnect_t/0}, {spawn, fun server_buffer_full_node_goes_down_t/0}, {spawn, fun response_timeout_t/0}, + {spawn, fun buffer_time_t/0}, + {spawn, fun buffer_time_flush_on_unbuffered_t/0}, + {spawn, fun buffer_time_not_flushed_by_reply_t/0}, {spawn, fun send_backoff_tcp_t/0}, {spawn, fun send_backoff_tls_t/0}, {spawn, fun fail_hello_t/0}, @@ -303,6 +306,102 @@ response_timeout_t() -> {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). +buffer_time_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% Both pings should arrive in a single recv + {ok, <<"*1\r\n$4\r\nping\r\n" + "*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, <<"+pong\r\n+pong\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + Pid = self(), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end, + #{buffer_time => 100}), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end, + #{buffer_time => 100}), + {1, {ok, <<"pong">>}} = get_msg(), + {2, {ok, <<"pong">>}} = get_msg(), + no_more_msgs(). + +buffer_time_flush_on_unbuffered_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(ListenSock), + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% Buffered and unbuffered should arrive together + {ok, <<"*1\r\n$4\r\nping\r\n" + "*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, <<"+pong\r\n+pong\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + Pid = self(), + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end, + #{buffer_time => 5000}), + %% Unbuffered command should flush the buffered one immediately + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end), + {1, {ok, <<"pong">>}} = get_msg(1000), + {2, {ok, <<"pong">>}} = get_msg(1000), + no_more_msgs(). + +buffer_time_not_flushed_by_reply_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(ListenSock), + Ping = <<"*1\r\n$4\r\nping\r\n">>, + Pid = self(), + ServerPid = spawn_link( + fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + %% First: the unbuffered ping arrives + {ok, Ping} = gen_tcp:recv(Sock, size(Ping)), + %% Wait for the test to send the buffered command + receive send_reply -> ok end, + %% Reply to it; this should NOT flush the buffered command + ok = gen_tcp:send(Sock, <<"+pong1\r\n">>), + %% Nothing more should arrive yet (buffered cmd still waiting) + {error, timeout} = gen_tcp:recv(Sock, 0, 100), + Pid ! server_recv_timeout, + %% The third (unbuffered) ping flushes the buffered one + TwoPings = <>, + {ok, TwoPings} = gen_tcp:recv(Sock, size(TwoPings)), + ok = gen_tcp:send(Sock, <<"+pong2\r\n+pong3\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port), + expect_connection_up(Client), + %% Send an unbuffered command; it is sent immediately + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {1, Reply} end), + %% Send a buffered command; it stays in the waiting queue + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {2, Reply} end, + #{buffer_time => 60000}), + %% Sync call to ensure both casts have been processed, so the + %% buffered command is in the waiting queue when the reply arrives. + _ = sys:get_state(Client), + %% Now let the server reply; the reply must not flush the buffered command + ServerPid ! send_reply, + %% Receive reply to first command + {1, {ok, <<"pong1">>}} = get_msg(1000), + %% Wait for the server to confirm nothing else was received + server_recv_timeout = get_msg(2000), + %% Send an unbuffered command to flush the buffer + ered_client:command_async(Client, [<<"ping">>], + fun(Reply) -> Pid ! {3, Reply} end), + {2, {ok, <<"pong2">>}} = get_msg(1000), + {3, {ok, <<"pong3">>}} = get_msg(1000), + no_more_msgs(). + send_backoff_tcp_t() -> send_backoff_t(gen_tcp).