Просмотр исходного кода

feat: implement log throttling

Serge Tupchii 2 лет назад
Родитель
Сommit
98ba300f7c

+ 15 - 0
apps/emqx/include/logger.hrl

@@ -40,6 +40,21 @@
     end
 ).
 
+%% NOTE: do not forget to add every used msg to the default value of
+%% `log.thorttling.msgs` list.
+-define(SLOG_THROTTLE(Level, Data),
+    ?SLOG_THROTTLE(Level, Data, #{})
+).
+
+-define(SLOG_THROTTLE(Level, Data, Meta),
+    case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of
+        true ->
+            ?SLOG(Level, Data, Meta);
+        false ->
+            ok
+    end
+).
+
 -define(AUDIT_HANDLER, emqx_audit).
 -define(TRACE_FILTER, emqx_trace_filter).
 -define(OWN_KEYS, [level, filters, filter_default, handlers]).

+ 2 - 1
apps/emqx/src/emqx_kernel_sup.erl

@@ -40,7 +40,8 @@ init([]) ->
             child_spec(emqx_authn_authz_metrics_sup, supervisor),
             child_spec(emqx_ocsp_cache, worker),
             child_spec(emqx_crl_cache, worker),
-            child_spec(emqx_tls_lib_sup, supervisor)
+            child_spec(emqx_tls_lib_sup, supervisor),
+            child_spec(emqx_log_throttler, worker)
         ]
     }}.
 

+ 146 - 0
apps/emqx/src/emqx_log_throttler.erl

@@ -0,0 +1,146 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_log_throttler).
+
+-behaviour(gen_server).
+
+-include("logger.hrl").
+-include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-export([start_link/0]).
+
+%% throttler API
+-export([allow/2]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-define(SEQ_ID(Msg), {?MODULE, Msg}).
+-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
+-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
+-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
+-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
+-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
+-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
+
+-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
+
+-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
+-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
+
+-spec allow(logger:level(), string()) -> boolean().
+allow(debug, _Msg) ->
+    true;
+allow(_Level, Msg) ->
+    Seq = persistent_term:get(?SEQ_ID(Msg), undefined),
+    case Seq of
+        undefined ->
+            %% This is either a race condition (emqx_log_throttler is not started yet)
+            %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
+            %% not added to the default value of `log.throttling.msgs`.
+            ?SLOG(info, #{
+                msg => "missing_log_throttle_sequence",
+                throttled_msg => Msg
+            }),
+            true;
+        SeqRef ->
+            ?IS_ALLOWED(SeqRef)
+    end.
+
+-spec start_link() -> startlink_ret().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
+    TimerRef = schedule_refresh(?TIME_WINDOW_MS),
+    {ok, #{timer_ref => TimerRef}}.
+
+handle_call(Req, _From, State) ->
+    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
+    {reply, ignored, State}.
+
+handle_cast(Msg, State) ->
+    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
+    {noreply, State}.
+
+handle_info(refresh, State) ->
+    PeriodMs = ?TIME_WINDOW_MS,
+    Msgs = ?MSGS_LIST,
+    DroppedStats = lists:foldl(
+        fun(Msg, Acc) ->
+            case ?GET_SEQ(Msg) of
+                %% Should not happen, unless the static ids list is updated at run-time.
+                undefined ->
+                    ?NEW_THROTTLE(Msg, ?NEW_SEQ),
+                    ?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
+                    Acc;
+                SeqRef ->
+                    Dropped = ?GET_DROPPED(SeqRef),
+                    ok = ?RESET_SEQ(SeqRef),
+                    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
+                    maybe_add_dropped(Msg, Dropped, Acc)
+            end
+        end,
+        #{},
+        Msgs
+    ),
+    maybe_log_dropped(DroppedStats, PeriodMs),
+    State1 = State#{timer_ref => schedule_refresh(PeriodMs)},
+    {noreply, State1};
+handle_info(Info, State) ->
+    ?SLOG(error, #{msg => "unxpected_info", info => Info}),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% internal functions
+%%--------------------------------------------------------------------
+
+maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 ->
+    DroppedAcc#{Msg => Dropped};
+maybe_add_dropped(_Msg, _Dropped, DroppedAcc) ->
+    DroppedAcc.
+
+maybe_log_dropped(DroppedStats, PeriodMs) when map_size(DroppedStats) > 0 ->
+    ?SLOG(warning, #{
+        msg => "log_events_throttled_during_last_period",
+        dropped => DroppedStats,
+        period => emqx_utils_calendar:human_readable_duration_string(PeriodMs)
+    });
+maybe_log_dropped(_DroppedStats, _PeriodMs) ->
+    ok.
+
+schedule_refresh(PeriodMs) ->
+    erlang:send_after(PeriodMs, ?MODULE, refresh).

+ 150 - 0
apps/emqx/test/emqx_log_throttler_SUITE.erl

@@ -0,0 +1,150 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_log_throttler_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(THROTTLE_MSG, "test_throttle_msg").
+-define(THROTTLE_MSG1, "test_throttle_msg1").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    %% This test suite can't be run in standalone tests (without emqx_conf)
+    case module_exists(emqx_conf) of
+        true ->
+            Apps = emqx_cth_suite:start(
+                [
+                    {emqx_conf, #{
+                        config =>
+                            #{
+                                log => #{
+                                    throttling => #{
+                                        time_window => <<"1s">>, msgs => [?THROTTLE_MSG]
+                                    }
+                                }
+                            }
+                    }},
+                    emqx
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config)}
+            ),
+            [{suite_apps, Apps} | Config];
+        false ->
+            {skip, standalone_not_supported}
+    end.
+
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
+    emqx_config:delete_override_conf_files().
+
+init_per_testcase(t_throttle_add_new_msg, Config) ->
+    ok = snabbkaffe:start_trace(),
+    [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}),
+    Config;
+init_per_testcase(_TC, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(t_throttle_add_new_msg, _Config) ->
+    ok = snabbkaffe:stop(),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
+    ok;
+end_per_testcase(_TC, _Config) ->
+    ok = snabbkaffe:stop().
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_throttle(_Config) ->
+    ?check_trace(
+        begin
+            %% Warm-up and block to increase the probability that next events
+            %% will be in the same throttling time window.
+            lists:foreach(
+                fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end,
+                lists:seq(1, 100)
+            ),
+            {ok, _} = ?block_until(
+                #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 3000
+            ),
+
+            ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
+            ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
+            %% Debug is always allowed
+            ?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)),
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ?THROTTLE_MSG,
+                    dropped_count := 1
+                },
+                3000
+            )
+        end,
+        []
+    ).
+
+t_throttle_add_new_msg(_Config) ->
+    ?check_trace(
+        begin
+            ?block_until(
+                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 3000
+            ),
+            ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
+            ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ?THROTTLE_MSG1,
+                    dropped_count := 1
+                },
+                3000
+            )
+        end,
+        []
+    ).
+
+t_throttle_no_msg(_Config) ->
+    %% Must simply pass with no crashes
+    ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")),
+    ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")),
+    timer:sleep(10),
+    ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
+
+%%--------------------------------------------------------------------
+%% internal functions
+%%--------------------------------------------------------------------
+
+module_exists(Mod) ->
+    case erlang:module_loaded(Mod) of
+        true ->
+            true;
+        false ->
+            case code:ensure_loaded(Mod) of
+                ok -> true;
+                {module, Mod} -> true;
+                _ -> false
+            end
+    end.

+ 30 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -909,7 +909,12 @@ fields("log") ->
                     aliases => [file_handlers],
                     importance => ?IMPORTANCE_HIGH
                 }
-            )}
+            )},
+        {"throttling",
+            sc(?R_REF("log_throttling"), #{
+                desc => ?DESC("log_throttling"),
+                importance => ?IMPORTANCE_MEDIUM
+            })}
     ];
 fields("console_handler") ->
     log_handler_common_confs(console, #{});
@@ -1012,6 +1017,28 @@ fields("log_burst_limit") ->
                 }
             )}
     ];
+fields("log_throttling") ->
+    [
+        {"window_time",
+            sc(
+                emqx_schema:duration_s(),
+                #{
+                    default => <<"1m">>,
+                    desc => ?DESC("log_throttling_window_time"),
+                    importance => ?IMPORTANCE_MEDIUM
+                }
+            )},
+        %% A static list of event ids used in ?SLOG_THROTTLE/3,4 macro.
+        %% For internal (developer) use only.
+        {"event_ids",
+            sc(
+                hoconsc:array(atom()),
+                #{
+                    default => [],
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )}
+    ];
 fields("authorization") ->
     emqx_schema:authz_fields() ++
         emqx_authz_schema:authz_fields().
@@ -1046,6 +1073,8 @@ desc("log_burst_limit") ->
     ?DESC("desc_log_burst_limit");
 desc("authorization") ->
     ?DESC("desc_authorization");
+desc("log_throttling") ->
+    ?DESC("desc_log_throttling");
 desc(_) ->
     undefined.
 

+ 7 - 1
apps/emqx_conf/test/emqx_conf_logger_SUITE.erl

@@ -35,6 +35,10 @@
          level = info
          path = \"log/emqx.log\"
        }
+      throttling {
+         msgs = []
+         time_window = 1m
+      }
     }
     ").
 
@@ -84,7 +88,9 @@ t_log_conf(_Conf) ->
                 <<"time_offset">> => <<"system">>
             },
         <<"file">> =>
-            #{<<"default">> => FileExpect}
+            #{<<"default">> => FileExpect},
+        <<"throttling">> =>
+            #{<<"time_window">> => <<"1m">>, <<"msgs">> => []}
     },
     ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
     UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1),

+ 4 - 1
apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl

@@ -102,5 +102,8 @@ t_audit_log_conf(_Config) ->
                 <<"time_offset">> => <<"system">>
             }
     },
-    ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
+    %% The default value of throttling.msgs can be frequently updated,
+    %% remove it here, otherwise this test needs to be updated each time
+    %% a new throttle event is added.
+    ?assertEqual(ExpectLog1, maps:remove(<<"throttling">>, emqx_conf:get_raw([<<"log">>]))),
     ok.

+ 2 - 0
changes/ce/feat-12520.en.md

@@ -0,0 +1,2 @@
+Implement log throttling. The feature reduces the number of potentially flooding logged events by
+dropping all but the first event within a configured time window.

+ 13 - 0
rel/i18n/emqx_conf_schema.hocon

@@ -475,6 +475,19 @@ log_burst_limit_window_time.desc:
 log_burst_limit_window_time.label:
 """Window Time"""
 
+desc_log_throttling.label:
+"""Log Throttling"""
+
+desc_log_throttling.desc:
+"""Log throttling feature reduces the number of potentially flooding logged events by
+dropping all but the first event within a configured time window."""
+
+log_throttling_window_time.desc:
+"""A time interval at which log throttling is applied. Defaults to 1 minute."""
+
+log_throttling_window_time.label:
+"""Log Throttling Window Time"""
+
 cluster_dns_record_type.desc:
 """DNS record type."""