Browse Source

feat: throttle with resource_id

zhongwencool 1 year ago
parent
commit
f6f1d32da0

+ 6 - 2
apps/emqx/include/logger.hrl

@@ -38,16 +38,20 @@
 ).
 
 %% NOTE: do not forget to use atom for msg and add every used msg to
-%% the default value of `log.thorttling.msgs` list.
+%% the default value of `log.throttling.msgs` list.
 -define(SLOG_THROTTLE(Level, Data),
     ?SLOG_THROTTLE(Level, Data, #{})
 ).
 
 -define(SLOG_THROTTLE(Level, Data, Meta),
+    ?SLOG_THROTTLE(Level, undefined, Data, Meta)
+).
+
+-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
     case logger:allow(Level, ?MODULE) of
         true ->
             (fun(#{msg := __Msg} = __Data) ->
-                case emqx_log_throttler:allow(__Msg) of
+                case emqx_log_throttler:allow(__Msg, UniqueKey) of
                     true ->
                         logger:log(Level, __Data, Meta);
                     false ->

+ 65 - 15
apps/emqx/src/emqx_log_throttler.erl

@@ -25,7 +25,7 @@
 -export([start_link/0]).
 
 %% throttler API
--export([allow/1]).
+-export([allow/2]).
 
 %% gen_server callbacks
 -export([
@@ -40,23 +40,22 @@
 -define(SEQ_ID(Msg), {?MODULE, Msg}).
 -define(NEW_SEQ, atomics:new(1, [{signed, false}])).
 -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
+-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
 -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
 -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
 -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
 -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
 
--define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-
 -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
 -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
 
--spec allow(atom()) -> boolean().
-allow(Msg) when is_atom(Msg) ->
+-spec allow(atom(), any()) -> boolean().
+allow(Msg, UniqueKey) when is_atom(Msg) ->
     case emqx_logger:get_primary_log_level() of
         debug ->
             true;
         _ ->
-            do_allow(Msg)
+            do_allow(Msg, UniqueKey)
     end.
 
 -spec start_link() -> startlink_ret().
@@ -68,7 +67,8 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
+    process_flag(trap_exit, true),
+    ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST),
     CurrentPeriodMs = ?TIME_WINDOW_MS,
     TimerRef = schedule_refresh(CurrentPeriodMs),
     {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@@ -88,14 +88,19 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
             case ?GET_SEQ(Msg) of
                 %% Should not happen, unless the static ids list is updated at run-time.
                 undefined ->
-                    ?NEW_THROTTLE(Msg, ?NEW_SEQ),
+                    new_throttler(Msg),
                     ?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
                     Acc;
+                SeqMap when is_map(SeqMap) ->
+                    maps:fold(
+                        fun(Key, Ref, Acc0) ->
+                            drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0)
+                        end,
+                        Acc,
+                        SeqMap
+                    );
                 SeqRef ->
-                    Dropped = ?GET_DROPPED(SeqRef),
-                    ok = ?RESET_SEQ(SeqRef),
-                    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
-                    maybe_add_dropped(Msg, Dropped, Acc)
+                    drop_stats(SeqRef, Msg, Acc)
             end
         end,
         #{},
@@ -112,7 +117,34 @@ handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unxpected_info", info => Info}),
     {noreply, State}.
 
+drop_stats(SeqRef, Msg, Acc) ->
+    Dropped = ?GET_DROPPED(SeqRef),
+    ok = ?RESET_SEQ(SeqRef),
+    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
+    maybe_add_dropped(Msg, Dropped, Acc).
+
 terminate(_Reason, _State) ->
+    lists:foreach(
+        fun(Msg) ->
+            case ?GET_SEQ(Msg) of
+                undefined ->
+                    ok;
+                SeqMap when is_map(SeqMap) ->
+                    maps:foreach(
+                        fun(_, Ref) ->
+                            ok = ?RESET_SEQ(Ref)
+                        end,
+                        SeqMap
+                    );
+                SeqRef ->
+                    %% atomics don't have erase API...
+                    %% (if nobody hold the ref, the atomics should erase automatically?)
+                    ok = ?RESET_SEQ(SeqRef)
+            end,
+            ?ERASE_SEQ(Msg)
+        end,
+        ?MSGS_LIST
+    ),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -122,17 +154,27 @@ code_change(_OldVsn, State, _Extra) ->
 %% internal functions
 %%--------------------------------------------------------------------
 
-do_allow(Msg) ->
+do_allow(Msg, UniqueKey) ->
     case persistent_term:get(?SEQ_ID(Msg), undefined) of
         undefined ->
             %% This is either a race condition (emqx_log_throttler is not started yet)
             %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
             %% not added to the default value of `log.throttling.msgs`.
-            ?SLOG(info, #{
-                msg => "missing_log_throttle_sequence",
+            ?SLOG(debug, #{
+                msg => "log_throttle_disabled",
                 throttled_msg => Msg
             }),
             true;
+        %% e.g: unrecoverable msg throttle according resource_id
+        SeqMap when is_map(SeqMap) ->
+            case maps:find(UniqueKey, SeqMap) of
+                {ok, SeqRef} ->
+                    ?IS_ALLOWED(SeqRef);
+                error ->
+                    SeqRef = ?NEW_SEQ,
+                    new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
+                    true
+            end;
         SeqRef ->
             ?IS_ALLOWED(SeqRef)
     end.
@@ -154,3 +196,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
 schedule_refresh(PeriodMs) ->
     ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
     erlang:send_after(PeriodMs, ?MODULE, refresh).
+
+new_throttler(unrecoverable_resource_error = Msg) ->
+    persistent_term:put(?SEQ_ID(Msg), #{});
+new_throttler(Msg) ->
+    persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ).
+
+new_throttler(Msg, Map) ->
+    persistent_term:put(?SEQ_ID(Msg), Map).

+ 58 - 11
apps/emqx/test/emqx_log_throttler_SUITE.erl

@@ -26,6 +26,7 @@
 %% Have to use real msgs, as the schema is guarded by enum.
 -define(THROTTLE_MSG, authorization_permission_denied).
 -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
+-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
 -define(TIME_WINDOW, <<"1s">>).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
@@ -59,6 +60,11 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     emqx_config:delete_override_conf_files().
 
+init_per_testcase(t_throttle_recoverable_msg, Config) ->
+    ok = snabbkaffe:start_trace(),
+    [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
+    Config;
 init_per_testcase(t_throttle_add_new_msg, Config) ->
     ok = snabbkaffe:start_trace(),
     [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
     ok = snabbkaffe:start_trace(),
     Config.
 
+end_per_testcase(t_throttle_recoverable_msg, _Config) ->
+    ok = snabbkaffe:stop(),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
+    ok;
 end_per_testcase(t_throttle_add_new_msg, _Config) ->
     ok = snabbkaffe:stop(),
     {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@@ -101,8 +111,8 @@ t_throttle(_Config) ->
                 5000
             ),
 
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -115,14 +125,48 @@ t_throttle(_Config) ->
         []
     ).
 
+t_throttle_recoverable_msg(_Config) ->
+    ResourceId = <<"resource_id">>,
+    ThrottledMsg = emqx_utils:format("~ts:~s", [?THROTTLE_UNRECOVERABLE_MSG, ResourceId]),
+    ?check_trace(
+        begin
+            %% Warm-up and block to increase the probability that next events
+            %% will be in the same throttling time window.
+            {ok, _} = ?block_until(
+                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
+                5000
+            ),
+            {_, {ok, _}} = ?wait_async_action(
+                events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg
+                },
+                5000
+            ),
+
+            ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg,
+                    dropped_count := 1
+                },
+                3000
+            )
+        end,
+        []
+    ).
+
 t_throttle_add_new_msg(_Config) ->
     ?check_trace(
         begin
             {ok, _} = ?block_until(
                 #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -137,8 +181,8 @@ t_throttle_add_new_msg(_Config) ->
 
 t_throttle_no_msg(_Config) ->
     %% Must simply pass with no crashes
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
     timer:sleep(10),
     ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
 
@@ -168,8 +212,8 @@ t_throttle_debug_primary_level(_Config) ->
                 #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
                 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -187,10 +231,13 @@ t_throttle_debug_primary_level(_Config) ->
 %%--------------------------------------------------------------------
 
 events(Msg) ->
-    events(100, Msg).
+    events(100, Msg, undefined).
+
+events(Msg, Id) ->
+    events(100, Msg, Id).
 
-events(N, Msg) ->
-    [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
+events(N, Msg, Id) ->
+    [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
 
 module_exists(Mod) ->
     case erlang:module_loaded(Mod) of

+ 0 - 8
apps/emqx_resource/include/emqx_resource.hrl

@@ -167,12 +167,4 @@
 ).
 -define(TAG, "RESOURCE").
 
--define(LOG_LEVEL(_L_),
-    case _L_ of
-        true -> info;
-        false -> warning
-    end
-).
--define(TAG, "RESOURCE").
-
 -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).

+ 20 - 10
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -981,11 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG_THROTTLE(error, #{
-                        resource_id => Id,
-                        msg => unrecoverable_resource_error,
-                        reason => Reason
-                    }),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =
@@ -1025,11 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG_THROTTLE(error, #{
-                        resource_id => Id,
-                        msg => unrecoverable_resource_error,
-                        reason => Reason
-                    }),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =