Parcourir la source

Merge pull request #13528 from zmstone/0726-unrecoverable-error-limit

0726 unrecoverable error limit
zmstone il y a 1 an
Parent
commit
7f7d0741d2

+ 6 - 2
apps/emqx/include/logger.hrl

@@ -38,16 +38,20 @@
 ).
 
 %% NOTE: do not forget to use atom for msg and add every used msg to
-%% the default value of `log.thorttling.msgs` list.
+%% the default value of `log.throttling.msgs` list.
 -define(SLOG_THROTTLE(Level, Data),
     ?SLOG_THROTTLE(Level, Data, #{})
 ).
 
 -define(SLOG_THROTTLE(Level, Data, Meta),
+    ?SLOG_THROTTLE(Level, undefined, Data, Meta)
+).
+
+-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
     case logger:allow(Level, ?MODULE) of
         true ->
             (fun(#{msg := __Msg} = __Data) ->
-                case emqx_log_throttler:allow(__Msg) of
+                case emqx_log_throttler:allow(__Msg, UniqueKey) of
                     true ->
                         logger:log(Level, __Data, Meta);
                     false ->

+ 56 - 16
apps/emqx/src/emqx_log_throttler.erl

@@ -25,7 +25,7 @@
 -export([start_link/0]).
 
 %% throttler API
--export([allow/1]).
+-export([allow/2]).
 
 %% gen_server callbacks
 -export([
@@ -40,23 +40,29 @@
 -define(SEQ_ID(Msg), {?MODULE, Msg}).
 -define(NEW_SEQ, atomics:new(1, [{signed, false}])).
 -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
+-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
 -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
 -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
 -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
 -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
 
--define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-
 -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
 -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
 
--spec allow(atom()) -> boolean().
-allow(Msg) when is_atom(Msg) ->
+%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
+%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
+%% for predefined message IDs.
+%% For relatively static resources created from configurations such as data integration
+%% resource IDs `UniqueKey' should be of `binary()' type.
+-spec allow(atom(), undefined | binary()) -> boolean().
+allow(Msg, UniqueKey) when
+    is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
+->
     case emqx_logger:get_primary_log_level() of
         debug ->
             true;
         _ ->
-            do_allow(Msg)
+            do_allow(Msg, UniqueKey)
     end.
 
 -spec start_link() -> startlink_ret().
@@ -68,7 +74,8 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
+    process_flag(trap_exit, true),
+    ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
     CurrentPeriodMs = ?TIME_WINDOW_MS,
     TimerRef = schedule_refresh(CurrentPeriodMs),
     {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
     DroppedStats = lists:foldl(
         fun(Msg, Acc) ->
             case ?GET_SEQ(Msg) of
-                %% Should not happen, unless the static ids list is updated at run-time.
                 undefined ->
-                    ?NEW_THROTTLE(Msg, ?NEW_SEQ),
+                    %% Should not happen, unless the static ids list is updated at run-time.
+                    new_throttler(Msg),
                     ?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
                     Acc;
+                SeqMap when is_map(SeqMap) ->
+                    maps:fold(
+                        fun(Key, Ref, Acc0) ->
+                            ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
+                            drop_stats(Ref, ID, Acc0)
+                        end,
+                        Acc,
+                        SeqMap
+                    );
                 SeqRef ->
-                    Dropped = ?GET_DROPPED(SeqRef),
-                    ok = ?RESET_SEQ(SeqRef),
-                    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
-                    maybe_add_dropped(Msg, Dropped, Acc)
+                    drop_stats(SeqRef, Msg, Acc)
             end
         end,
         #{},
@@ -112,7 +125,16 @@ handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unxpected_info", info => Info}),
     {noreply, State}.
 
+drop_stats(SeqRef, Msg, Acc) ->
+    Dropped = ?GET_DROPPED(SeqRef),
+    ok = ?RESET_SEQ(SeqRef),
+    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
+    maybe_add_dropped(Msg, Dropped, Acc).
+
 terminate(_Reason, _State) ->
+    %% atomics do not have delete/remove/release/deallocate API
+    %% after the reference is garbage-collected the resource is released
+    lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
 %% internal functions
 %%--------------------------------------------------------------------
 
-do_allow(Msg) ->
+do_allow(Msg, UniqueKey) ->
     case persistent_term:get(?SEQ_ID(Msg), undefined) of
         undefined ->
             %% This is either a race condition (emqx_log_throttler is not started yet)
             %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
             %% not added to the default value of `log.throttling.msgs`.
-            ?SLOG(info, #{
-                msg => "missing_log_throttle_sequence",
+            ?SLOG(debug, #{
+                msg => "log_throttle_disabled",
                 throttled_msg => Msg
             }),
             true;
+        %% e.g: unrecoverable msg throttle according resource_id
+        SeqMap when is_map(SeqMap) ->
+            case maps:find(UniqueKey, SeqMap) of
+                {ok, SeqRef} ->
+                    ?IS_ALLOWED(SeqRef);
+                error ->
+                    SeqRef = ?NEW_SEQ,
+                    new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
+                    true
+            end;
         SeqRef ->
             ?IS_ALLOWED(SeqRef)
     end.
@@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
 schedule_refresh(PeriodMs) ->
     ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
     erlang:send_after(PeriodMs, ?MODULE, refresh).
+
+new_throttler(unrecoverable_resource_error = Msg) ->
+    new_throttler(Msg, #{});
+new_throttler(Msg) ->
+    new_throttler(Msg, ?NEW_SEQ).
+
+new_throttler(Msg, AtomicOrEmptyMap) ->
+    persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).

+ 65 - 13
apps/emqx/test/emqx_log_throttler_SUITE.erl

@@ -26,6 +26,7 @@
 %% Have to use real msgs, as the schema is guarded by enum.
 -define(THROTTLE_MSG, authorization_permission_denied).
 -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
+-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
 -define(TIME_WINDOW, <<"1s">>).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
@@ -59,6 +60,11 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     emqx_config:delete_override_conf_files().
 
+init_per_testcase(t_throttle_recoverable_msg, Config) ->
+    ok = snabbkaffe:start_trace(),
+    [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
+    Config;
 init_per_testcase(t_throttle_add_new_msg, Config) ->
     ok = snabbkaffe:start_trace(),
     [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
     ok = snabbkaffe:start_trace(),
     Config.
 
+end_per_testcase(t_throttle_recoverable_msg, _Config) ->
+    ok = snabbkaffe:stop(),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
+    ok;
 end_per_testcase(t_throttle_add_new_msg, _Config) ->
     ok = snabbkaffe:stop(),
     {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@@ -101,8 +111,8 @@ t_throttle(_Config) ->
                 5000
             ),
 
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -115,14 +125,48 @@ t_throttle(_Config) ->
         []
     ).
 
+t_throttle_recoverable_msg(_Config) ->
+    ResourceId = <<"resource_id">>,
+    ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
+    ?check_trace(
+        begin
+            %% Warm-up and block to increase the probability that next events
+            %% will be in the same throttling time window.
+            {ok, _} = ?block_until(
+                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
+                5000
+            ),
+            {_, {ok, _}} = ?wait_async_action(
+                events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg
+                },
+                5000
+            ),
+
+            ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg,
+                    dropped_count := 1
+                },
+                3000
+            )
+        end,
+        []
+    ).
+
 t_throttle_add_new_msg(_Config) ->
     ?check_trace(
         begin
             {ok, _} = ?block_until(
                 #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
 
 t_throttle_no_msg(_Config) ->
     %% Must simply pass with no crashes
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
-    timer:sleep(10),
-    ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
+    Pid = erlang:whereis(emqx_log_throttler),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
+    %% assert process is not restarted
+    ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
+    %% make a gen_call to ensure the process is alive
+    %% note: this call result in an 'unexpected_call' error log.
+    ?assertEqual(ignored, gen_server:call(Pid, probe)),
+    ok.
 
 t_update_time_window(_Config) ->
     ?check_trace(
@@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
                 #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
                 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
 %%--------------------------------------------------------------------
 
 events(Msg) ->
-    events(100, Msg).
+    events(100, Msg, undefined).
+
+events(Msg, Id) ->
+    events(100, Msg, Id).
 
-events(N, Msg) ->
-    [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
+events(N, Msg, Id) ->
+    [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
 
 module_exists(Mod) ->
     case erlang:module_loaded(Mod) of

+ 2 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -80,7 +80,8 @@
     cannot_publish_to_topic_due_to_not_authorized,
     cannot_publish_to_topic_due_to_quota_exceeded,
     connection_rejected_due_to_license_limit_reached,
-    dropped_msg_due_to_mqueue_is_full
+    dropped_msg_due_to_mqueue_is_full,
+    unrecoverable_resource_error
 ]).
 
 %% Callback to upgrade config after loaded from config file but before validation.

+ 54 - 24
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) ->
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 running(info, Info, _St) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
@@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) ->
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 blocked(info, Info, _Data) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
@@ -981,7 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =
@@ -1021,7 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =
@@ -1141,12 +1159,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte
         false ->
             ok;
         true ->
-            ?SLOG(info, #{
-                msg => "buffer_worker_dropped_expired_messages",
-                resource_id => Id,
-                worker_index => Index,
-                expired_count => ExpiredCount
-            }),
+            ?SLOG(
+                info,
+                #{
+                    msg => "buffer_worker_dropped_expired_messages",
+                    resource_id => Id,
+                    worker_index => Index,
+                    expired_count => ExpiredCount
+                },
+                #{tag => ?TAG}
+            ),
             ok
     end.
 
@@ -1556,7 +1578,7 @@ handle_async_reply1(
     case is_expired(ExpireAt, Now) of
         true ->
             IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
-            %% evalutate metrics call here since we're not inside
+            %% evaluate metrics call here since we're not inside
             %% buffer worker
             IsAcked andalso
                 begin
@@ -1797,12 +1819,16 @@ append_queue(Id, Index, Q, Queries) ->
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
                 Counters = #{dropped_queue_full => Dropped},
-                ?SLOG(info, #{
-                    msg => "buffer_worker_overflow",
-                    resource_id => Id,
-                    worker_index => Index,
-                    dropped => Dropped
-                }),
+                ?SLOG(
+                    info,
+                    #{
+                        msg => "buffer_worker_overflow",
+                        resource_id => Id,
+                        worker_index => Index,
+                        dropped => Dropped
+                    },
+                    #{tag => ?TAG}
+                ),
                 {Items2, Q1, Counters}
         end,
     ?tp(
@@ -2236,11 +2262,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) ->
     BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
     case BatchTime =:= BatchTime0 of
         false ->
-            ?SLOG(info, #{
-                id => Id,
-                msg => "adjusting_buffer_worker_batch_time",
-                new_batch_time => BatchTime
-            });
+            ?SLOG(
+                info,
+                #{
+                    resource_id => Id,
+                    msg => "adjusting_buffer_worker_batch_time",
+                    new_batch_time => BatchTime
+                },
+                #{tag => ?TAG}
+            );
         true ->
             ok
     end,

+ 1 - 0
changes/ce/feat-13528.en.md

@@ -0,0 +1 @@
+Add log throttling for data integration unrecoverable errors.