Просмотр исходного кода

feature(tcp): alarm when tcp connection congested

Shawn 5 лет назад
Родитель
Сommit
d00ea48752
5 измененных файлов с 63 добавлено и 3 удалено
  1. 7 0
      etc/emqx.conf
  2. 11 0
      priv/emqx.schema
  3. 2 0
      src/emqx_alarm.erl
  4. 38 3
      src/emqx_connection.erl
  5. 5 0
      test/emqx_connection_SUITE.erl

+ 7 - 0
etc/emqx.conf

@@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on
 ## Value: on | off
 ## listener.tcp.external.tune_buffer = off
 
+## The socket is set to a busy state when the amount of data queued internally
+## by the ERTS socket implementation reaches this limit.
+##
+## Value: on | off
+## Defaults to 1MB
+## listener.tcp.external.high_watermark = 1MB
+
 ## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
 ## sent immediately if the option is enabled.
 ##

+ 11 - 0
priv/emqx.schema

@@ -1244,6 +1244,11 @@ end}.
   hidden
 ]}.
 
+{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [
+   {datatype, bytesize},
+   {default, "1MB"}
+ ]}.
+
 {mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [
   {datatype, flag},
   hidden
@@ -1336,6 +1341,11 @@ end}.
   hidden
 ]}.
 
+{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [
+   {datatype, bytesize},
+   {default, "1MB"}
+ ]}.
+
 {mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [
   {datatype, flag},
   hidden
@@ -1844,6 +1854,7 @@ end}.
                           {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
                           {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
                           {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
+                          {high_watermark,  cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)},
                           {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
                           {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
               end,

+ 2 - 0
src/emqx_alarm.erl

@@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) ->
     list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
 normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
     list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
+normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
+     list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
 normalize_message(_Name, _UnknownDetails) ->
     <<"Unknown alarm">>.
 

+ 38 - 3
src/emqx_connection.erl

@@ -103,6 +103,9 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
+-define(ALARM_TCP_CONGEST(Channel),
+         list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))).
+
 -dialyzer({no_match, [info/2]}).
 -dialyzer({nowarn_function, [ init/4
                             , init_state/3
@@ -429,6 +432,7 @@ handle_msg(Msg, State) ->
 
 terminate(Reason, State = #state{channel = Channel}) ->
     ?LOG(debug, "Terminated due to ~p", [Reason]),
+    emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
     emqx_channel:terminate(Reason, Channel),
     close_socket(State),
     exit(Reason).
@@ -595,11 +599,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 %% Send data
 
 -spec(send(iodata(), state()) -> ok).
-send(IoData, #state{transport = Transport, socket = Socket}) ->
+send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
     Oct = iolist_size(IoData),
     ok = emqx_metrics:inc('bytes.sent', Oct),
     emqx_pd:inc_counter(outgoing_bytes, Oct),
-    case Transport:async_send(Socket, IoData) of
+    maybe_warn_congestion(Socket, Transport, Channel),
+    case Transport:async_send(Socket, IoData, [nosuspend]) of
         ok -> ok;
         Error = {error, _Reason} ->
             %% Send an inet_reply to postpone handling the error
@@ -607,6 +612,36 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
             ok
     end.
 
+maybe_warn_congestion(Socket, Transport, Channel) ->
+    IsCongestAlarmSet = is_congestion_alarm_set(),
+    case is_congested(Socket, Transport) of
+        true when not IsCongestAlarmSet ->
+            {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]),
+            {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]),
+            ok = set_congestion_alarm(),
+            emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts));
+        false when IsCongestAlarmSet ->
+            ok = clear_congestion_alarm(),
+            emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
+        _ -> ok
+    end.
+
+is_congested(Socket, Transport) ->
+    case Transport:getstat(Socket, [send_pend]) of
+        {ok, [{send_pend, N}]} when N > 0 -> true;
+        _ -> false
+    end.
+
+is_congestion_alarm_set() ->
+    case erlang:get(conn_congested) of
+        true -> true;
+        _ -> false
+    end.
+set_congestion_alarm() ->
+    erlang:put(conn_congested, true), ok.
+clear_congestion_alarm() ->
+    erlang:put(conn_congested, false), ok.
+
 %%--------------------------------------------------------------------
 %% Handle Info
 
@@ -622,7 +657,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     end;
 
 handle_info({sock_error, Reason}, State) ->
-    ?LOG(debug, "Socket error: ~p", [Reason]),
+    Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
     handle_info({sock_closed, Reason}, close_socket(State));
 
 handle_info(Info, State) ->

+ 5 - 0
test/emqx_connection_SUITE.erl

@@ -52,6 +52,9 @@ init_per_suite(Config) ->
 
     ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
 
+    ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
+    ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
+
     Config.
 
 end_per_suite(_Config) ->
@@ -62,6 +65,7 @@ end_per_suite(_Config) ->
     ok = meck:unload(emqx_pd),
     ok = meck:unload(emqx_metrics),
     ok = meck:unload(emqx_hooks),
+    ok = meck:unload(emqx_alarm),
     ok.
 
 init_per_testcase(_TestCase, Config) ->
@@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) ->
                                                       {ok, [{K, 0} || K <- Options]}
                                               end),
     ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
+    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
     ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
     Config.