Przeglądaj źródła

refactor(emqx): the congestion alarm

Shawn 4 lat temu
rodzic
commit
df0e905754
5 zmienionych plików z 72 dodań i 64 usunięć
  1. 24 0
      etc/emqx.conf
  2. 14 0
      priv/emqx.schema
  3. 2 2
      src/emqx_alarm.erl
  4. 26 49
      src/emqx_congestion.erl
  5. 6 13
      src/emqx_connection.erl

+ 24 - 0
etc/emqx.conf

@@ -876,6 +876,30 @@ zone.external.enable_flapping_detect = off
 ## Example: 100KB incoming per 10 seconds.
 #zone.external.rate_limit.conn_bytes_in = 100KB,10s
 
+## Whether to alarm the congested connections.
+##
+## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
+## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
+## full. If more packets comes after that, the packets will be "pending" in a queue
+## and we consider the connection is "congested".
+##
+## Enable this to send an alarm when there's any bytes pending in the queue. You could set
+## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
+##
+## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
+## Where the <ClientID> is the client-id of the congested MQTT connection.
+## And the <Username> is the username or "unknown_user" of not provided by the client.
+## Default: off
+#zone.external.conn_congestion.alarm = off
+
+## Won't clear the congested alarm in how long time.
+## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
+## `wont_clear_alarm_in` time since the last time we considered the connection is "congested".
+##
+## This is to avoid clearing and sending the alarm again too often.
+## Default: 1m
+#zone.external.conn_congestion.wont_clear_alarm_in = 1m
+
 ## Messages quota for the each of external MQTT connection.
 ## This value consumed by the number of recipient on a message.
 ##

+ 14 - 0
priv/emqx.schema

@@ -1015,6 +1015,16 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
+  {datatype, flag},
+  {default, off}
+]}.
+
+{mapping, "zone.$name.conn_congestion.wont_clear_alarm_in", "emqx.zones", [
+  {default, "1m"},
+  {datatype, {duration, ms}}
+]}.
+
 {mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
   {datatype, string}
 ]}.
@@ -1144,6 +1154,10 @@ end}.
                    {ratelimit, {conn_messages_in, Ratelimit(Val)}};
                (["rate_limit", "conn_bytes_in"], Val) ->
                    {ratelimit, {conn_bytes_in, Ratelimit(Val)}};
+               (["conn_congestion", "alarm"], Val) ->
+                   {conn_congestion_alarm_enabled, Val};
+               (["conn_congestion", "wont_clear_alarm_in"], Val) ->
+                   {conn_congestion_wont_clear_alarm_in, Val};
                (["quota", "conn_messages_routing"], Val) ->
                    {quota, {conn_messages_routing, Ratelimit(Val)}};
                (["quota", "overall_messages_routing"], Val) ->

+ 2 - 2
src/emqx_alarm.erl

@@ -379,7 +379,7 @@ normalize_message(partition, #{occurred := Node}) ->
     list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
 normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
     list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
-normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
-    list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
+normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
+    list_to_binary(io_lib:format("connection congested: ~s", [Info]));
 normalize_message(_Name, _UnknownDetails) ->
     <<"Unknown alarm">>.

+ 26 - 49
src/emqx_congestion.erl

@@ -16,22 +16,16 @@
 
 -module(emqx_congestion).
 
--export([ maybe_alarm_port_busy/3
-        , maybe_alarm_port_busy/4
-        , maybe_alarm_too_many_publish/5
-        , maybe_alarm_too_many_publish/6
+-export([ maybe_alarm_conn_congestion/3
         , cancel_alarms/3
         ]).
 
--elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]).
-
 -define(ALARM_CONN_CONGEST(Channel, Reason),
         list_to_binary(
-          io_lib:format("mqtt_conn/congested/~s/~s/~s",
-                        [emqx_channel:info(clientid, Channel),
+          io_lib:format("~s/~s/~s",
+                        [Reason, emqx_channel:info(clientid, Channel),
                          maps:get(username, emqx_channel:info(clientinfo, Channel),
-                                  <<"undefined">>),
-                         Reason]))).
+                                  <<"unknown_user">>)]))).
 
 -define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username,
                                proto_name, proto_ver, connected_at, conn_state]).
@@ -39,44 +33,28 @@
 -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
 -define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
 -define(ALARM_SENT(REASON), {alarm_sent, REASON}).
--define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
--define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
--define(CONFIRM_CLEAR_INTERVAL, 10000).
-
-maybe_alarm_port_busy(Socket, Transport, Channel) ->
-    maybe_alarm_port_busy(Socket, Transport, Channel, false).
-
-maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
-    case is_tcp_congested(Socket, Transport) of
-        true -> alarm_congestion(Socket, Transport, Channel, port_busy);
-        false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
-                    ForceClear)
-    end.
+-define(ALL_ALARM_REASONS, [conn_congestion]).
+-define(WONT_CLEAR_IN, 60000).
 
-maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
-        MaxBatchSize) ->
-    maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
-        MaxBatchSize, false).
-
-maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
-        PubMsgCount = _MaxBatchSize, _ForceClear) ->
-    %% we only alarm it when the process is "too busy"
-    alarm_congestion(Socket, Transport, Channel, too_many_publish);
-maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
-        _MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
-    %% but we clear the alarm until it is really "idle", to avoid sending
-    %% alarms and clears too frequently
-    cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
-        ForceClear);
-maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
-        _MaxBatchSize, _ForceClear) ->
-    ok.
+maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
+    case is_alarm_enabled(Channel) of
+        false -> ok;
+        true ->
+            case is_tcp_congested(Socket, Transport) of
+                true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
+                false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
+            end
+    end.
 
 cancel_alarms(Socket, Transport, Channel) ->
     lists:foreach(fun(Reason) ->
         do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
     end, ?ALL_ALARM_REASONS).
 
+is_alarm_enabled(Channel) ->
+    emqx_zone:get_env(emqx_channel:info(zone, Channel),
+        conn_congestion_alarm_enabled, false).
+
 alarm_congestion(Socket, Transport, Channel, Reason) ->
     case has_alarm_sent(Reason) of
         false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@@ -85,8 +63,10 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
             update_alarm_sent_at(Reason)
     end.
 
-cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
-    case is_alarm_allowed_clear(Reason, ForceClear) of
+cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
+    Zone = emqx_channel:info(zone, Channel),
+    WontClearIn = emqx_zone:get_env(Zone, conn_congestion_wont_clear_alarm_in, ?WONT_CLEAR_IN),
+    case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
         true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
         false -> ok
     end.
@@ -125,14 +105,11 @@ get_alarm_sent_at(Reason) ->
         undefined -> 0;
         LastSentAt -> LastSentAt
     end.
-
-is_alarm_allowed_clear(Reason, _ForceClear = true) ->
-    has_alarm_sent(Reason);
-is_alarm_allowed_clear(Reason, _ForceClear = false) ->
+long_time_since_last_alarm(Reason, WontClearIn) ->
     %% only sent clears when the alarm was not triggered in the last
-    %% ?CONFIRM_CLEAR_INTERVAL time
+    %% WontClearIn time
     case timenow() - get_alarm_sent_at(Reason) of
-        Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
+        Elapse when Elapse >= WontClearIn -> true;
         _ -> false
     end.
 

+ 6 - 13
src/emqx_connection.erl

@@ -390,12 +390,8 @@ handle_msg({Passive, _Sock}, State)
     handle_info(activate_socket, NState1);
 
 handle_msg(Deliver = {deliver, _Topic, _Msg},
-           #state{active_n = MaxBatchSize, transport = Transport,
-                  socket = Socket, channel = Channel} = State) ->
-    Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
-    emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
-        length(Delivers0), MaxBatchSize),
-    Delivers = [Deliver|Delivers0],
+           #state{active_n = ActiveN} = State) ->
+    Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
 
 %% Something sent
@@ -509,12 +505,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
                         },
     handle_info(activate_socket, NState);
 
-handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
-        channel = Channel, transport = Transport, socket = Socket}) ->
-    {_, MsgQLen} = erlang:process_info(self(), message_queue_len),
-    emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
-    emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
-        MsgQLen, MaxBatchSize, true),
+handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
+        socket = Socket}) ->
+    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:set_chan_stats(ClientId, stats(State)),
     {ok, State#state{stats_timer = undefined}};
@@ -627,7 +620,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
     Oct = iolist_size(IoData),
     ok = emqx_metrics:inc('bytes.sent', Oct),
     inc_counter(outgoing_bytes, Oct),
-    emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
+    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
     case Transport:async_send(Socket, IoData, [nosuspend]) of
         ok -> ok;
         Error = {error, _Reason} ->