|
|
@@ -49,8 +49,15 @@
|
|
|
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
|
|
|
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
|
|
|
|
|
|
--spec allow(atom(), any()) -> boolean().
|
|
|
-allow(Msg, UniqueKey) when is_atom(Msg) ->
|
|
|
+%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
|
|
|
+%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
|
|
|
+%% for predefined message IDs.
|
|
|
+%% For relatively static resources created from configurations such as data integration
|
|
|
+%% resource IDs `UniqueKey' should be of `binary()' type.
|
|
|
+-spec allow(atom(), undefined | binary()) -> boolean().
|
|
|
+allow(Msg, UniqueKey) when
|
|
|
+ is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
|
|
|
+->
|
|
|
case emqx_logger:get_primary_log_level() of
|
|
|
debug ->
|
|
|
true;
|
|
|
@@ -68,7 +75,7 @@ start_link() ->
|
|
|
|
|
|
init([]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
- ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST),
|
|
|
+ ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
|
|
|
CurrentPeriodMs = ?TIME_WINDOW_MS,
|
|
|
TimerRef = schedule_refresh(CurrentPeriodMs),
|
|
|
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
|
|
|
@@ -86,15 +93,16 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
|
|
|
DroppedStats = lists:foldl(
|
|
|
fun(Msg, Acc) ->
|
|
|
case ?GET_SEQ(Msg) of
|
|
|
- %% Should not happen, unless the static ids list is updated at run-time.
|
|
|
undefined ->
|
|
|
+ %% Should not happen, unless the static ids list is updated at run-time.
|
|
|
new_throttler(Msg),
|
|
|
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
|
|
|
Acc;
|
|
|
SeqMap when is_map(SeqMap) ->
|
|
|
maps:fold(
|
|
|
fun(Key, Ref, Acc0) ->
|
|
|
- drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0)
|
|
|
+ ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
|
|
|
+ drop_stats(Ref, ID, Acc0)
|
|
|
end,
|
|
|
Acc,
|
|
|
SeqMap
|
|
|
@@ -124,27 +132,9 @@ drop_stats(SeqRef, Msg, Acc) ->
|
|
|
maybe_add_dropped(Msg, Dropped, Acc).
|
|
|
|
|
|
terminate(_Reason, _State) ->
|
|
|
- lists:foreach(
|
|
|
- fun(Msg) ->
|
|
|
- case ?GET_SEQ(Msg) of
|
|
|
- undefined ->
|
|
|
- ok;
|
|
|
- SeqMap when is_map(SeqMap) ->
|
|
|
- maps:foreach(
|
|
|
- fun(_, Ref) ->
|
|
|
- ok = ?RESET_SEQ(Ref)
|
|
|
- end,
|
|
|
- SeqMap
|
|
|
- );
|
|
|
- SeqRef ->
|
|
|
- %% atomics don't have erase API...
|
|
|
- %% (if nobody hold the ref, the atomics should erase automatically?)
|
|
|
- ok = ?RESET_SEQ(SeqRef)
|
|
|
- end,
|
|
|
- ?ERASE_SEQ(Msg)
|
|
|
- end,
|
|
|
- ?MSGS_LIST
|
|
|
- ),
|
|
|
+ %% atomics do not have delete/remove/release/deallocate API
|
|
|
+ %% after the reference is garbage-collected the resource is released
|
|
|
+ lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
|
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
@@ -198,9 +188,9 @@ schedule_refresh(PeriodMs) ->
|
|
|
erlang:send_after(PeriodMs, ?MODULE, refresh).
|
|
|
|
|
|
new_throttler(unrecoverable_resource_error = Msg) ->
|
|
|
- persistent_term:put(?SEQ_ID(Msg), #{});
|
|
|
+ new_throttler(Msg, #{});
|
|
|
new_throttler(Msg) ->
|
|
|
- persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ).
|
|
|
+ new_throttler(Msg, ?NEW_SEQ).
|
|
|
|
|
|
-new_throttler(Msg, Map) ->
|
|
|
- persistent_term:put(?SEQ_ID(Msg), Map).
|
|
|
+new_throttler(Msg, AtomicOrEmptyMap) ->
|
|
|
+ persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).
|