Просмотр исходного кода

perf: eliminate the loop-back fake async inet_reply message

Now that EMQX is on OTP 26 by default, the fake inet_reply
message when the send call returns ok only adds overhead.

For TCP (gen_tcp module), OTP 26 no longer supports direct port_command
so we had to swith to gen_tcp:send, then fake an async reply:
see https://github.com/emqx/esockd/pull/181

For TLS (ssl module), it has never supported async send anyway.

This commit switches to call Transport:send/2 directly,
this should result in one less loop-back message for each message.

For OTP 25 on which EMQX can still be compiled, one will have to
suffer the performance penalty for gen_tcp:send function to
do a non-optimized 'receive' for the send result.
zmstone 1 год назад
Родитель
Сommit
2525bd0c16

+ 30 - 49
apps/emqx/src/emqx_connection.erl

@@ -158,31 +158,6 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
--define(ALARM_TCP_CONGEST(Channel),
-    list_to_binary(
-        io_lib:format(
-            "mqtt_conn/congested/~ts/~ts",
-            [
-                emqx_channel:info(clientid, Channel),
-                emqx_channel:info(username, Channel)
-            ]
-        )
-    )
-).
-
--define(ALARM_CONN_INFO_KEYS, [
-    socktype,
-    sockname,
-    peername,
-    clientid,
-    username,
-    proto_name,
-    proto_ver,
-    connected_at
-]).
--define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
--define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-
 -define(LIMITER_BYTES_IN, bytes).
 -define(LIMITER_MESSAGE_IN, messages).
 
@@ -603,17 +578,6 @@ handle_msg(
     ActiveN = get_active_n(Type, Listener),
     Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
-%% Something sent
-handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) ->
-    case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
-        true ->
-            Pubs = emqx_pd:reset_counter(outgoing_pubs),
-            Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
-        false ->
-            ok
-    end;
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 handle_msg({connack, ConnAck}, State) ->
@@ -729,9 +693,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
             shutdown(Reason, Reply, State#state{channel = NChannel});
         {shutdown, Reason, Reply, OutPacket, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(OutPacket, NState),
-            NState2 = graceful_shutdown_transport(Reason, NState),
-            shutdown(Reason, Reply, NState2)
+            {ok, NState2} = handle_outgoing(OutPacket, NState),
+            NState3 = graceful_shutdown_transport(Reason, NState2),
+            shutdown(Reason, Reply, NState3)
     end.
 
 %%--------------------------------------------------------------------
@@ -854,8 +818,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
             shutdown(Reason, State#state{channel = NChannel});
         {shutdown, Reason, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, NState)
+            {ok, NState2} = handle_outgoing(Packet, NState),
+            shutdown(Reason, NState2)
     end.
 
 %%--------------------------------------------------------------------
@@ -909,19 +873,36 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 %%--------------------------------------------------------------------
 %% Send data
 
--spec send(iodata(), state()) -> ok.
-send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
+-spec send(iodata(), state()) -> {ok, state()}.
+send(IoData, #state{transport = Transport, socket = Socket} = State) ->
     Oct = iolist_size(IoData),
-    ok = emqx_metrics:inc('bytes.sent', Oct),
+    emqx_metrics:inc('bytes.sent', Oct),
     inc_counter(outgoing_bytes, Oct),
-    case Transport:async_send(Socket, IoData, []) of
+    case Transport:send(Socket, IoData) of
         ok ->
-            ok;
+            %% NOTE: for Transport=emqx_quic_stream, it's actually an
+            %% async_send, sent/1 should technically be called when
+            %% {quic, send_complete, _Stream, true | false} is received,
+            %% but it is handled early for simplicity
+            sent(State);
         Error = {error, _Reason} ->
-            %% Send an inet_reply to postpone handling the error
-            %% @FIXME: why not just return error?
+            %% Defer error handling
+            %% so it's handled the same as tcp_closed or ssl_closed
             self() ! {inet_reply, Socket, Error},
-            ok
+            {ok, State}
+    end.
+
+%% Some bytes sent
+sent(#state{listener = {Type, Listener}} = State) ->
+    %% Run GC and check OOM after certain amount of messages or bytes sent.
+    case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
+        true ->
+            Pubs = emqx_pd:reset_counter(outgoing_pubs),
+            Bytes = emqx_pd:reset_counter(outgoing_bytes),
+            OutStats = #{cnt => Pubs, oct => Bytes},
+            {ok, check_oom(run_gc(OutStats, State))};
+        false ->
+            {ok, State}
     end.
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx/src/emqx_quic_stream.erl

@@ -34,7 +34,7 @@
     fast_close/1,
     shutdown/2,
     ensure_ok_or_exit/2,
-    async_send/3,
+    send/2,
     setopts/2,
     getopts/2,
     peername/1,
@@ -165,7 +165,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
             Result
     end.
 
-async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
+send({quic, _Conn, Stream, _Info}, Data) ->
     case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
         {ok, _Len} -> ok;
         {error, X, Y} -> {error, {X, Y}};

+ 9 - 13
apps/emqx/test/emqx_connection_SUITE.erl

@@ -94,8 +94,7 @@ init_per_testcase(TestCase, Config) when
     ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
         {ok, [{K, 0} || K <- Options]}
     end),
-    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
-    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
+    ok = meck:expect(emqx_transport, send, fun(_Sock, _Data) -> ok end),
     ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
     case erlang:function_exported(?MODULE, TestCase, 2) of
         true -> ?MODULE:TestCase(init, Config);
@@ -234,9 +233,11 @@ t_handle_msg_incoming(_) ->
     ?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())).
 
 t_handle_msg_outgoing(_) ->
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
+    ?assertMatch(
+        {ok, _}, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())
+    ),
+    ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
+    ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
 
 t_handle_msg_tcp_error(_) ->
     ?assertMatch(
@@ -255,18 +256,13 @@ t_handle_msg_deliver(_) ->
     ?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())).
 
 t_handle_msg_inet_reply(_) ->
-    ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
-    emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0),
-    ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
-    emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100),
-    ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
     ?assertMatch(
         {stop, {shutdown, for_testing}, _St},
         handle_msg({inet_reply, for_testing, {error, for_testing}}, st())
     ).
 
 t_handle_msg_connack(_) ->
-    ?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
+    ?assertMatch({ok, _}, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
 
 t_handle_msg_close(_) ->
     ?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())).
@@ -388,8 +384,8 @@ t_with_channel(_) ->
     meck:unload(emqx_channel).
 
 t_handle_outgoing(_) ->
-    ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
-    ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
+    ?assertMatch({ok, _}, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
+    ?assertMatch({ok, _}, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
 
 t_handle_info(_) ->
     ?assertMatch(

+ 29 - 27
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -242,7 +242,7 @@ esockd_send(Data, #state{
 }) ->
     gen_udp:send(Sock, Ip, Port, Data);
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
-    esockd_transport:async_send(Sock, Data).
+    esockd_transport:send(Sock, Data).
 
 keepalive_stats(recv) ->
     emqx_pd:get_counter(recv_pkt);
@@ -503,18 +503,6 @@ handle_msg(
 ) ->
     Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
-%% Something sent
-%% TODO: Who will deliver this message?
-handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
-    case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
-        true ->
-            Pubs = emqx_pd:reset_counter(outgoing_pkt),
-            Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
-        false ->
-            ok
-    end;
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 handle_msg({close, Reason}, State) ->
@@ -630,8 +618,8 @@ handle_call(
             shutdown(Reason, Reply, State#state{channel = NChannel});
         {shutdown, Reason, Reply, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, Reply, NState)
+            {ok, NState1} = handle_outgoing(Packet, NState),
+            shutdown(Reason, Reply, NState1)
     end.
 
 %%--------------------------------------------------------------------
@@ -772,15 +760,15 @@ with_channel(
             shutdown(Reason, State#state{channel = NChannel});
         {shutdown, Reason, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, NState)
+            {ok, NState1} = handle_outgoing(Packet, NState),
+            shutdown(Reason, NState1)
     end.
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(_Packets = [], _State) ->
-    ok;
+handle_outgoing(_Packets = [], State) ->
+    {ok, State};
 handle_outgoing(
     Packets,
     State = #state{socket = Socket}
@@ -792,12 +780,15 @@ handle_outgoing(
                 State
             );
         _ ->
-            lists:foreach(
-                fun(Packet) ->
-                    handle_outgoing(Packet, State)
+            NState = lists:foldl(
+                fun(Packet, State0) ->
+                    {ok, State1} = handle_outgoing(Packet, State0),
+                    State1
                 end,
+                State,
                 Packets
-            )
+            ),
+            {ok, NState}
     end;
 handle_outgoing(Packet, State) ->
     send((serialize_and_inc_stats_fun(State))(Packet), State).
@@ -842,7 +833,7 @@ serialize_and_inc_stats_fun(#state{
 %%--------------------------------------------------------------------
 %% Send data
 
--spec send(iodata(), state()) -> ok.
+-spec send(iodata(), state()) -> {ok, state()}.
 send(
     IoData,
     State = #state{
@@ -858,11 +849,22 @@ send(
     inc_counter(outgoing_bytes, Oct),
     case esockd_send(IoData, State) of
         ok ->
-            ok;
+            sent(State);
         Error = {error, _Reason} ->
-            %% Send an inet_reply to postpone handling the error
+            %% Send an inet_reply to defer handling the error
             self() ! {inet_reply, Socket, Error},
-            ok
+            {ok, State}
+    end.
+
+sent(#state{active_n = ActiveN} = State) ->
+    case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
+        true ->
+            Pubs = emqx_pd:reset_counter(outgoing_pkt),
+            Bytes = emqx_pd:reset_counter(outgoing_bytes),
+            OutStats = #{cnt => Pubs, oct => Bytes},
+            {ok, check_oom(run_gc(OutStats, State))};
+        false ->
+            {ok, State}
     end.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.1.32"},
+    {vsn, "0.1.33"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]},