Browse Source

feat: enable log throttling for potentially flooding log events

Serge Tupchii 2 years ago
parent
commit
9bd0d1ba1b

+ 2 - 2
apps/emqx/include/logger.hrl

@@ -40,8 +40,8 @@
     end
 ).
 
-%% NOTE: do not forget to add every used msg to the default value of
-%% `log.thorttling.msgs` list.
+%% NOTE: do not forget to use atom for msg and add every used msg to
+%% the default value of `log.thorttling.msgs` list.
 -define(SLOG_THROTTLE(Level, Data),
     ?SLOG_THROTTLE(Level, Data, #{})
 ).

+ 7 - 2
apps/emqx/src/emqx_access_control.erl

@@ -183,8 +183,13 @@ log_result(#{username := Username}, Topic, Action, From, Result) ->
         }
     end,
     case Result of
-        allow -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
-        deny -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"})
+        allow ->
+            ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
+        deny ->
+            ?SLOG_THROTTLE(
+                warning,
+                (LogMeta())#{msg => authorization_permission_denied}
+            )
     end.
 
 %% @private Format authorization rules source.

+ 6 - 6
apps/emqx/src/emqx_channel.erl

@@ -616,10 +616,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
             Msg = packet_to_message(NPacket, NChannel),
             do_publish(PacketId, Msg, NChannel);
         {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
-            ?SLOG(
-                info,
+            ?SLOG_THROTTLE(
+                warning,
                 #{
-                    msg => "cannot_publish_to_topic",
+                    msg => cannot_publish_to_topic_due_to_not_authorized,
                     reason => emqx_reason_codes:name(Rc)
                 },
                 #{topic => Topic}
@@ -635,10 +635,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
                     handle_out(disconnect, Rc, NChannel)
             end;
         {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
-            ?SLOG(
-                info,
+            ?SLOG_THROTTLE(
+                warning,
                 #{
-                    msg => "cannot_publish_to_topic",
+                    msg => cannot_publish_to_topic_due_to_quota_exceeded,
                     reason => emqx_reason_codes:name(Rc)
                 },
                 #{topic => Topic}

+ 12 - 7
apps/emqx/src/emqx_log_throttler.erl

@@ -50,10 +50,10 @@
 -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
 -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
 
--spec allow(logger:level(), string()) -> boolean().
+-spec allow(logger:level(), atom()) -> boolean().
 allow(debug, _Msg) ->
     true;
-allow(_Level, Msg) ->
+allow(_Level, Msg) when is_atom(Msg) ->
     Seq = persistent_term:get(?SEQ_ID(Msg), undefined),
     case Seq of
         undefined ->
@@ -79,8 +79,9 @@ start_link() ->
 
 init([]) ->
     ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
-    TimerRef = schedule_refresh(?TIME_WINDOW_MS),
-    {ok, #{timer_ref => TimerRef}}.
+    CurrentPeriodMs = ?TIME_WINDOW_MS,
+    TimerRef = schedule_refresh(CurrentPeriodMs),
+    {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
 
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -90,8 +91,7 @@ handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
     {noreply, State}.
 
-handle_info(refresh, State) ->
-    PeriodMs = ?TIME_WINDOW_MS,
+handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
     Msgs = ?MSGS_LIST,
     DroppedStats = lists:foldl(
         fun(Msg, Acc) ->
@@ -112,7 +112,11 @@ handle_info(refresh, State) ->
         Msgs
     ),
     maybe_log_dropped(DroppedStats, PeriodMs),
-    State1 = State#{timer_ref => schedule_refresh(PeriodMs)},
+    NewPeriodMs = ?TIME_WINDOW_MS,
+    State1 = State#{
+        timer_ref => schedule_refresh(NewPeriodMs),
+        current_period_ms => NewPeriodMs
+    },
     {noreply, State1};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unxpected_info", info => Info}),
@@ -143,4 +147,5 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
     ok.
 
 schedule_refresh(PeriodMs) ->
+    ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
     erlang:send_after(PeriodMs, ?MODULE, refresh).

+ 3 - 3
apps/emqx/src/emqx_session_events.erl

@@ -62,10 +62,10 @@ handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}})
     ok = emqx_metrics:inc('delivery.dropped.queue_full'),
     ok = inc_pd('send_msg.dropped', 1),
     ok = inc_pd('send_msg.dropped.queue_full', 1),
-    ?SLOG(
-        info,
+    ?SLOG_THROTTLE(
+        warning,
         Ctx#{
-            msg => "dropped_msg_due_to_mqueue_is_full",
+            msg => dropped_msg_due_to_mqueue_is_full,
             payload => Msg#message.payload
         },
         #{topic => Msg#message.topic}

+ 27 - 7
apps/emqx/test/emqx_log_throttler_SUITE.erl

@@ -23,8 +23,10 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(THROTTLE_MSG, "test_throttle_msg").
--define(THROTTLE_MSG1, "test_throttle_msg1").
+%% Have to use real msgs, as the schema is guarded by enum.
+-define(THROTTLE_MSG, authorization_permission_denied).
+-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
+-define(TIME_WINDOW, <<"1s">>).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
@@ -39,7 +41,7 @@ init_per_suite(Config) ->
                             #{
                                 log => #{
                                     throttling => #{
-                                        time_window => <<"1s">>, msgs => [?THROTTLE_MSG]
+                                        time_window => ?TIME_WINDOW, msgs => [?THROTTLE_MSG]
                                     }
                                 }
                             }
@@ -70,6 +72,10 @@ end_per_testcase(t_throttle_add_new_msg, _Config) ->
     ok = snabbkaffe:stop(),
     {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
     ok;
+end_per_testcase(t_update_time_window, _Config) ->
+    ok = snabbkaffe:stop(),
+    {ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}),
+    ok;
 end_per_testcase(_TC, _Config) ->
     ok = snabbkaffe:stop().
 
@@ -87,7 +93,7 @@ t_throttle(_Config) ->
                 lists:seq(1, 100)
             ),
             {ok, _} = ?block_until(
-                #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 3000
+                #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000
             ),
 
             ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
@@ -110,7 +116,7 @@ t_throttle_add_new_msg(_Config) ->
     ?check_trace(
         begin
             ?block_until(
-                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 3000
+                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
             ),
             ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
             ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
@@ -128,11 +134,25 @@ t_throttle_add_new_msg(_Config) ->
 
 t_throttle_no_msg(_Config) ->
     %% Must simply pass with no crashes
-    ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")),
-    ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")),
+    ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
+    ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
     timer:sleep(10),
     ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
 
+t_update_time_window(_Config) ->
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}),
+                #{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000},
+                5000
+            ),
+            timer:sleep(10),
+            ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler)))
+        end,
+        []
+    ).
+
 %%--------------------------------------------------------------------
 %% internal functions
 %%--------------------------------------------------------------------

+ 15 - 7
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -75,6 +75,14 @@
 %% 1 million default ports counter
 -define(DEFAULT_MAX_PORTS, 1024 * 1024).
 
+-define(LOG_THROTTLING_MSGS, [
+    authorization_permission_denied,
+    cannot_publish_to_topic_due_to_not_authorized,
+    cannot_publish_to_topic_due_to_quota_exceeded,
+    connection_rejected_due_to_license_limit_reached,
+    dropped_msg_due_to_mqueue_is_full
+]).
+
 %% Callback to upgrade config after loaded from config file but before validation.
 upgrade_raw_conf(Raw0) ->
     Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
@@ -910,7 +918,7 @@ fields("log") ->
                     importance => ?IMPORTANCE_HIGH
                 }
             )},
-        {"throttling",
+        {throttling,
             sc(?R_REF("log_throttling"), #{
                 desc => ?DESC("log_throttling"),
                 importance => ?IMPORTANCE_MEDIUM
@@ -1019,22 +1027,22 @@ fields("log_burst_limit") ->
     ];
 fields("log_throttling") ->
     [
-        {"window_time",
+        {time_window,
             sc(
                 emqx_schema:duration_s(),
                 #{
                     default => <<"1m">>,
-                    desc => ?DESC("log_throttling_window_time"),
+                    desc => ?DESC("log_throttling_time_window"),
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
-        %% A static list of event ids used in ?SLOG_THROTTLE/3,4 macro.
+        %% A static list of msgs used in ?SLOG_THROTTLE/2,3 macro.
         %% For internal (developer) use only.
-        {"event_ids",
+        {msgs,
             sc(
-                hoconsc:array(atom()),
+                hoconsc:array(hoconsc:enum(?LOG_THROTTLING_MSGS)),
                 #{
-                    default => [],
+                    default => ?LOG_THROTTLING_MSGS,
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}

+ 4 - 1
apps/emqx_license/src/emqx_license.erl

@@ -85,7 +85,10 @@ check(_ConnInfo, AckProps) ->
         {ok, #{max_connections := MaxClients}} ->
             case check_max_clients_exceeded(MaxClients) of
                 true ->
-                    ?SLOG(info, #{msg => "connection_rejected_due_to_license_limit_reached"}),
+                    ?SLOG_THROTTLE(
+                        error,
+                        #{msg => connection_rejected_due_to_license_limit_reached}
+                    ),
                     {stop, {error, ?RC_QUOTA_EXCEEDED}};
                 false ->
                     {ok, AckProps}

+ 4 - 4
rel/i18n/emqx_conf_schema.hocon

@@ -482,11 +482,11 @@ desc_log_throttling.desc:
 """Log throttling feature reduces the number of potentially flooding logged events by
 dropping all but the first event within a configured time window."""
 
-log_throttling_window_time.desc:
-"""A time interval at which log throttling is applied. Defaults to 1 minute."""
+log_throttling_time_window.desc:
+"""For throttled messages, only log 1 in each time window."""
 
-log_throttling_window_time.label:
-"""Log Throttling Window Time"""
+log_throttling_time_window.label:
+"""Log Throttling Time Window"""
 
 cluster_dns_record_type.desc:
 """DNS record type."""