Просмотр исходного кода

feat(retainer): limit GC process runtime by default

...Through forcing it to stop after specific (e.g. 50000) number of
cleared retained messages.
Andrew Mayorov 1 год назад
Родитель
Сommit
d2dd7a34e9

+ 24 - 9
apps/emqx_retainer/src/emqx_retainer_gc.erl

@@ -19,6 +19,7 @@
 -behaviour(gen_server).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([start_link/2]).
 
@@ -30,13 +31,23 @@
     handle_info/2
 ]).
 
--export_type([opts/0]).
+-export_type([opts/0, limit/0]).
 
--type opts() :: #{deadline => emqx_retainer:deadline()}.
+-type limit() :: all | non_neg_integer().
+-type opts() :: #{
+    deadline := emqx_retainer:deadline(),
+    limit => limit()
+}.
 
--type backend_state() :: term().
+-callback clear_expired(_BackendState, emqx_retainer:deadline(), limit()) ->
+    {_Complete :: boolean(), _NCleared :: non_neg_integer()}.
 
--callback clear_expired(backend_state(), emqx_retainer:deadline()) -> ok.
+-define(DEFAULT_CLEAR_EXPIRED_LIMIT, 50_000).
+
+-ifdef(TEST).
+-undef(DEFAULT_CLEAR_EXPIRED_LIMIT).
+-define(DEFAULT_CLEAR_EXPIRED_LIMIT, 100).
+-endif.
 
 %%------------------------------------------------------------------------------
 %% APIs
@@ -68,8 +79,11 @@ handle_call(Req, _From, State) ->
     {reply, ignored, State}.
 
 handle_cast(clear_expired, State = {Context, Opts}) ->
-    Deadline = maps:get(deadline, Opts),
-    Result = clear_expired(Context, Deadline),
+    Result = {Complete, NCleared} = clear_expired(Context, Opts),
+    ?tp(debug, emqx_retainer_cleared_expired, #{
+        complete => Complete,
+        n_cleared => NCleared
+    }),
     {stop, {shutdown, Result}, State};
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@@ -79,8 +93,9 @@ handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
--spec clear_expired(emqx_retainer:context(), emqx_retainer:deadline()) -> ok.
-clear_expired(Context, Deadline) ->
+clear_expired(Context, Opts) ->
     Mod = emqx_retainer:backend_module(Context),
     BackendState = emqx_retainer:backend_state(Context),
-    ok = Mod:clear_expired(BackendState, Deadline).
+    Deadline = maps:get(deadline, Opts),
+    Limit = maps:get(limit, Opts, ?DEFAULT_CLEAR_EXPIRED_LIMIT),
+    Mod:clear_expired(BackendState, Deadline, Limit).

+ 19 - 7
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -16,8 +16,6 @@
 
 -module(emqx_retainer_mnesia).
 
--behaviour(emqx_retainer).
--behaviour(emqx_retainer_gc).
 -behaviour(emqx_db_backup).
 
 -include("emqx_retainer.hrl").
@@ -25,7 +23,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 
-%% emqx_retainer callbacks
+-behaviour(emqx_retainer).
 -export([
     create/1,
     update/2,
@@ -36,11 +34,15 @@
     page_read/5,
     match_messages/3,
     delete_cursor/2,
-    clear_expired/2,
     clean/1,
     size/1
 ]).
 
+-behaviour(emqx_retainer_gc).
+-export([
+    clear_expired/3
+]).
+
 %% Internal exports (RPC)
 -export([
     do_populate_index_meta/1,
@@ -208,7 +210,7 @@ store_retained(State, Msg = #message{topic = Topic}) ->
             ok
     end.
 
-clear_expired(_State, Deadline) ->
+clear_expired(_State, Deadline, Limit) ->
     S0 = ets_stream(?TAB_MESSAGE),
     S1 = emqx_utils_stream:filter(
         fun(#retained_message{expiry_time = ExpiryTime}) ->
@@ -217,12 +219,22 @@ clear_expired(_State, Deadline) ->
         S0
     ),
     DirtyWriteIndices = dirty_indices(write),
-    emqx_utils_stream:foreach(
+    S2 = emqx_utils_stream:map(
         fun(RetainedMsg) ->
             delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
         end,
         S1
-    ).
+    ),
+    CountF = fun(_, N) -> N + 1 end,
+    case Limit of
+        all ->
+            NCleared = emqx_utils_stream:fold(CountF, 0, S2),
+            {_Complete = true, NCleared};
+        Num ->
+            {NCleared, SRest} = emqx_utils_stream:fold(CountF, 0, Num, S2),
+            Complete = SRest == [],
+            {Complete, NCleared}
+    end.
 
 delete_message(_State, Topic) ->
     Tokens = topic_to_tokens(Topic),