瀏覽代碼

Merge pull request #13969 from keynslug/fix/EMQX-13213/retainer-gc

feat(retainer): clean expired messages in separate process
Andrew Mayorov 1 年之前
父節點
當前提交
58c138e12b

+ 45 - 30
apps/emqx_retainer/src/emqx_retainer.erl

@@ -42,6 +42,7 @@
     delete/1,
     read_message/1,
     page_read/3,
+    page_read/4,
     post_config_update/5,
     stats_fun/0,
     retained_count/0,
@@ -66,6 +67,7 @@
 ]).
 
 -export_type([
+    deadline/0,
     cursor/0,
     context/0
 ]).
@@ -88,6 +90,7 @@
 
 -type topic() :: emqx_types:topic().
 -type message() :: emqx_types:message().
+-type deadline() :: emqx_utils_calendar:epoch_millisecond().
 -type cursor() :: undefined | term().
 -type has_next() :: boolean().
 
@@ -100,12 +103,17 @@
 -callback close(backend_state()) -> ok.
 -callback delete_message(backend_state(), topic()) -> ok.
 -callback store_retained(backend_state(), message()) -> ok.
--callback read_message(backend_state(), topic()) -> {ok, list(message())}.
--callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
+-callback page_read(
+    backend_state(),
+    emqx_maybe:t(topic()),
+    deadline(),
+    non_neg_integer(),
+    non_neg_integer()
+) ->
     {ok, has_next(), list(message())}.
+-callback read_message(backend_state(), topic()) -> {ok, list(message())}.
 -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
 -callback delete_cursor(backend_state(), cursor()) -> ok.
--callback clear_expired(backend_state()) -> ok.
 -callback clean(backend_state()) -> ok.
 -callback size(backend_state()) -> non_neg_integer().
 
@@ -199,7 +207,12 @@ read_message(Topic) ->
 -spec page_read(emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
     {ok, has_next(), list(message())}.
 page_read(Topic, Page, Limit) ->
-    call({?FUNCTION_NAME, Topic, Page, Limit}).
+    page_read(Topic, erlang:system_time(millisecond), Page, Limit).
+
+-spec page_read(emqx_maybe:t(topic()), deadline(), non_neg_integer(), non_neg_integer()) ->
+    {ok, has_next(), list(message())}.
+page_read(Topic, Deadline, Page, Limit) ->
+    call({?FUNCTION_NAME, Topic, Deadline, Page, Limit}).
 
 -spec enabled() -> boolean().
 enabled() ->
@@ -255,8 +268,8 @@ handle_call({delete, Topic}, _, #{context := Context} = State) ->
     {reply, ok, State};
 handle_call({read_message, Topic}, _, #{context := Context} = State) ->
     {reply, read_message(Context, Topic), State};
-handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
-    {reply, page_read(Context, Topic, Page, Limit), State};
+handle_call({page_read, Topic, Deadline, Page, Limit}, _, #{context := Context} = State) ->
+    {reply, page_read(Context, Topic, Deadline, Page, Limit), State};
 handle_call(retained_count, _From, State = #{context := Context}) ->
     {reply, count(Context), State};
 handle_call(enabled, _From, State = #{enable := Enable}) ->
@@ -274,10 +287,10 @@ handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
     {noreply, State}.
 
-handle_info(clear_expired, #{context := Context} = State) ->
-    ok = clear_expired(Context),
+handle_info({timeout, TRef, clear_expired}, #{context := Context, clear_timer := TRef} = State) ->
+    ok = start_clear_expired(Context),
     Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
-    {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
+    {noreply, State#{clear_timer := maybe_start_timer(Interval, clear_expired)}, hibernate};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
@@ -308,7 +321,6 @@ new_state() ->
 payload_size_limit() ->
     emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE).
 
-%% @private
 dispatch(Context, Topic) ->
     emqx_retainer_dispatcher:dispatch(Context, Topic).
 
@@ -324,12 +336,12 @@ read_message(Context, Topic) ->
     BackendState = backend_state(Context),
     Mod:read_message(BackendState, Topic).
 
--spec page_read(context(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
+-spec page_read(context(), emqx_maybe:t(topic()), deadline(), non_neg_integer(), non_neg_integer()) ->
     {ok, has_next(), list(message())}.
-page_read(Context, Topic, Page, Limit) ->
+page_read(Context, Topic, Deadline, Page, Limit) ->
     Mod = backend_module(Context),
     BackendState = backend_state(Context),
-    Mod:page_read(BackendState, Topic, Page, Limit).
+    Mod:page_read(BackendState, Topic, Deadline, Page, Limit).
 
 -spec count(context()) -> non_neg_integer().
 count(Context) ->
@@ -337,11 +349,14 @@ count(Context) ->
     BackendState = backend_state(Context),
     Mod:size(BackendState).
 
--spec clear_expired(context()) -> ok.
-clear_expired(Context) ->
-    Mod = backend_module(Context),
-    BackendState = backend_state(Context),
-    ok = Mod:clear_expired(BackendState).
+-spec start_clear_expired(context()) -> ok.
+start_clear_expired(Context) ->
+    Opts = #{
+        deadline => erlang:system_time(millisecond),
+        limit => emqx_conf:get([retainer, msg_clear_limit], all)
+    },
+    _Result = emqx_retainer_sup:start_gc(Context, Opts),
+    ok.
 
 -spec store_retained(context(), message()) -> ok.
 store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
@@ -399,7 +414,7 @@ update_config(
     case SameBackendType andalso ok =:= OldMod:update(Context, NewBackendConfig) of
         true ->
             State#{
-                clear_timer := check_timer(
+                clear_timer := update_timer(
                     ClearTimer,
                     ClearInterval,
                     clear_expired
@@ -421,7 +436,7 @@ enable_retainer(
     State#{
         enable := true,
         context := Context,
-        clear_timer := add_timer(ClearInterval, clear_expired)
+        clear_timer := maybe_start_timer(ClearInterval, clear_expired)
     }.
 
 -spec disable_retainer(state()) -> state().
@@ -442,23 +457,23 @@ disable_retainer(
 stop_timer(undefined) ->
     undefined;
 stop_timer(TimerRef) ->
-    _ = erlang:cancel_timer(TimerRef),
+    _ = emqx_utils:cancel_timer(TimerRef),
     undefined.
 
-add_timer(0, _) ->
+maybe_start_timer(0, _) ->
     undefined;
-add_timer(undefined, _) ->
+maybe_start_timer(undefined, _) ->
     undefined;
-add_timer(Ms, Content) ->
-    erlang:send_after(Ms, self(), Content).
+maybe_start_timer(Ms, Content) ->
+    emqx_utils:start_timer(Ms, self(), Content).
 
-check_timer(undefined, Ms, Context) ->
-    add_timer(Ms, Context);
-check_timer(Timer, 0, _) ->
+update_timer(undefined, Ms, Context) ->
+    maybe_start_timer(Ms, Context);
+update_timer(Timer, 0, _) ->
     stop_timer(Timer);
-check_timer(Timer, undefined, _) ->
+update_timer(Timer, undefined, _) ->
     stop_timer(Timer);
-check_timer(Timer, _, _) ->
+update_timer(Timer, _, _) ->
     Timer.
 
 -spec enabled_backend_config(hocon:config()) -> hocon:config() | no_return().

+ 94 - 0
apps/emqx_retainer/src/emqx_retainer_gc.erl

@@ -0,0 +1,94 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_gc).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-export([start_link/2]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+-export_type([opts/0, limit/0]).
+
+-type limit() :: all | non_neg_integer().
+-type opts() :: #{
+    deadline := emqx_retainer:deadline(),
+    limit := limit()
+}.
+
+-callback clear_expired(_BackendState, emqx_retainer:deadline(), limit()) ->
+    {_Complete :: boolean(), _NCleared :: non_neg_integer()}.
+
+%%------------------------------------------------------------------------------
+%% APIs
+%%------------------------------------------------------------------------------
+
+-spec start_link(emqx_retainer:context(), opts()) -> {ok, pid()} | ignore.
+start_link(Context, Opts) ->
+    case is_responsible() of
+        true ->
+            gen_server:start_link(?MODULE, {Context, Opts}, []);
+        false ->
+            ignore
+    end.
+
+is_responsible() ->
+    Nodes = lists:sort(mria_membership:running_core_nodelist()),
+    Nodes =/= [] andalso hd(Nodes) == node().
+
+%%------------------------------------------------------------------------------
+%% gen_server callbacks
+%%------------------------------------------------------------------------------
+
+init({Context, Opts}) ->
+    ok = gen_server:cast(self(), clear_expired),
+    {ok, {Context, Opts}}.
+
+handle_call(Req, _From, State) ->
+    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
+    {reply, ignored, State}.
+
+handle_cast(clear_expired, State = {Context, Opts}) ->
+    Result = {Complete, NCleared} = clear_expired(Context, Opts),
+    ?tp(debug, emqx_retainer_cleared_expired, #{
+        complete => Complete,
+        n_cleared => NCleared
+    }),
+    {stop, {shutdown, Result}, State};
+handle_cast(Msg, State) ->
+    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
+    {noreply, State}.
+
+handle_info(Info, State) ->
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
+    {noreply, State}.
+
+clear_expired(Context, Opts) ->
+    Mod = emqx_retainer:backend_module(Context),
+    BackendState = emqx_retainer:backend_state(Context),
+    Deadline = maps:get(deadline, Opts),
+    Limit = maps:get(limit, Opts),
+    Mod:clear_expired(BackendState, Deadline, Limit).

+ 24 - 21
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -16,7 +16,6 @@
 
 -module(emqx_retainer_mnesia).
 
--behaviour(emqx_retainer).
 -behaviour(emqx_db_backup).
 
 -include("emqx_retainer.hrl").
@@ -24,7 +23,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 
-%% emqx_retainer callbacks
+-behaviour(emqx_retainer).
 -export([
     create/1,
     update/2,
@@ -32,14 +31,18 @@
     delete_message/2,
     store_retained/2,
     read_message/2,
-    page_read/4,
+    page_read/5,
     match_messages/3,
     delete_cursor/2,
-    clear_expired/1,
     clean/1,
     size/1
 ]).
 
+-behaviour(emqx_retainer_gc).
+-export([
+    clear_expired/3
+]).
+
 %% Internal exports (RPC)
 -export([
     do_populate_index_meta/1,
@@ -207,30 +210,31 @@ store_retained(State, Msg = #message{topic = Topic}) ->
             ok
     end.
 
-clear_expired(_) ->
-    case mria_rlog:role() of
-        core ->
-            clear_expired();
-        _ ->
-            ok
-    end.
-
-clear_expired() ->
-    NowMs = erlang:system_time(millisecond),
+clear_expired(_State, Deadline, Limit) ->
     S0 = ets_stream(?TAB_MESSAGE),
     S1 = emqx_utils_stream:filter(
         fun(#retained_message{expiry_time = ExpiryTime}) ->
-            ExpiryTime =/= 0 andalso ExpiryTime < NowMs
+            ExpiryTime =/= 0 andalso ExpiryTime < Deadline
         end,
         S0
     ),
     DirtyWriteIndices = dirty_indices(write),
-    emqx_utils_stream:foreach(
+    S2 = emqx_utils_stream:map(
         fun(RetainedMsg) ->
             delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
         end,
         S1
-    ).
+    ),
+    CountF = fun(_, N) -> N + 1 end,
+    case Limit of
+        all ->
+            NCleared = emqx_utils_stream:fold(CountF, 0, S2),
+            {_Complete = true, NCleared};
+        Num ->
+            {NCleared, SRest} = emqx_utils_stream:fold(CountF, 0, Num, S2),
+            Complete = SRest == [],
+            {Complete, NCleared}
+    end.
 
 delete_message(_State, Topic) ->
     Tokens = topic_to_tokens(Topic),
@@ -270,15 +274,14 @@ match_messages(_State, _Topic, {S0, BatchNum}) ->
 delete_cursor(_State, _Cursor) ->
     ok.
 
-page_read(_State, Topic, Page, Limit) ->
-    Now = erlang:system_time(millisecond),
+page_read(_State, Topic, Deadline, Page, Limit) ->
     S0 =
         case Topic of
             undefined ->
-                msg_stream(search_stream(undefined, ['#'], Now));
+                msg_stream(search_stream(undefined, ['#'], Deadline));
             _ ->
                 Tokens = topic_to_tokens(Topic),
-                msg_stream(search_stream(Tokens, Now))
+                msg_stream(search_stream(Tokens, Deadline))
         end,
     %% This is very inefficient, but we are limited with inherited API
     S1 = emqx_utils_stream:list(

+ 7 - 0
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -57,6 +57,13 @@ fields("retainer") ->
                 msg_clear_interval,
                 <<"0s">>
             )},
+        {msg_clear_limit,
+            sc(
+                pos_integer(),
+                msg_clear_limit,
+                50_000,
+                ?IMPORTANCE_HIDDEN
+            )},
         {flow_control,
             sc(
                 ?R_REF(flow_control),

+ 16 - 2
apps/emqx_retainer/src/emqx_retainer_sup.erl

@@ -16,15 +16,29 @@
 
 -module(emqx_retainer_sup).
 
--behaviour(supervisor).
-
 -export([start_link/0]).
 
+-export([start_gc/2]).
+
+-behaviour(supervisor).
 -export([init/1]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+-spec start_gc(emqx_retainer:context(), emqx_retainer_gc:opts()) ->
+    supervisor:startchild_ret().
+start_gc(Context, Opts) ->
+    ChildSpec = #{
+        id => gc,
+        start => {emqx_retainer_gc, start_link, [Context, Opts]},
+        restart => temporary,
+        type => worker
+    },
+    supervisor:start_child(?MODULE, ChildSpec).
+
+%%
+
 init([]) ->
     PoolSpec = emqx_pool_sup:spec([
         emqx_retainer_dispatcher,

+ 2 - 2
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -606,12 +606,12 @@ t_clear_expired(Config) ->
         ),
         timer:sleep(1000),
 
-        {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
+        {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, _Deadline = 0, 1, 10),
         ?assertEqual(5, erlang:length(List)),
 
         timer:sleep(4500),
 
-        {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
+        {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, _Deadline = 0, 1, 10),
         ?assertEqual(0, erlang:length(List2)),
 
         ok = emqtt:disconnect(C1)

+ 5 - 4
apps/emqx_retainer/test/emqx_retainer_dummy.erl

@@ -20,6 +20,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 
 -behaviour(emqx_retainer).
+-behaviour(emqx_retainer_gc).
 
 -export([
     create/1,
@@ -28,10 +29,10 @@
     delete_message/2,
     store_retained/2,
     read_message/2,
-    page_read/4,
+    page_read/5,
     match_messages/3,
     delete_cursor/2,
-    clear_expired/1,
+    clear_expired/3,
     clean/1,
     size/1
 ]).
@@ -60,13 +61,13 @@ store_retained(_Context, _Message) -> ok.
 
 read_message(_Context, _Topic) -> {ok, []}.
 
-page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}.
+page_read(_Context, _Topic, _Deadline, _Offset, _Limit) -> {ok, false, []}.
 
 match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}.
 
 delete_cursor(_Context, _Cursor) -> ok.
 
-clear_expired(_Context) -> ok.
+clear_expired(_Context, _Deadline, _Limit) -> {true, 0}.
 
 clean(_Context) -> ok.
 

+ 147 - 0
apps/emqx_retainer/test/emqx_retainer_gc_SUITE.erl

@@ -0,0 +1,147 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_gc_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
+all() ->
+    [
+        {group, mnesia_without_indices},
+        {group, mnesia_with_indices}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {mnesia_without_indices, [], TCs},
+        {mnesia_with_indices, [], TCs}
+    ].
+
+init_per_group(mnesia_without_indices, Config) ->
+    ExtraConf = "retainer.backend.index_specs = []",
+    init_cluster(ExtraConf, Config);
+init_per_group(mnesia_with_indices, Config) ->
+    init_cluster("", Config).
+
+init_cluster(ExtraConf, Config) ->
+    Conf =
+        "retainer {"
+        "\n enable = true"
+        "\n msg_clear_interval = 0s"
+        "\n msg_clear_limit = 100"
+        "\n msg_expiry_interval = 3s"
+        "\n backend {"
+        "\n   type = built_in_database"
+        "\n   storage_type = disc"
+        "\n }"
+        "\n }",
+    NodeSpec = #{
+        role => core,
+        apps => [emqx, emqx_conf, {emqx_retainer, [Conf, ExtraConf]}]
+    },
+    Nodes = emqx_cth_cluster:start(
+        [{emqx_retainer_gc1, NodeSpec}, {emqx_retainer_gc2, NodeSpec}],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{cluster, Nodes} | Config].
+
+end_per_group(_Group, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)).
+
+%%
+
+t_exclusive_gc(Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    ?check_trace(
+        begin
+            Interval = 1000,
+            ok = enable_clear_expired(Interval, Config),
+            ok = timer:sleep(round(Interval * 1.5)),
+            ok = disable_clear_expired(Config)
+        end,
+        fun(Trace) ->
+            %% Only one node should have ran GC:
+            ?assertMatch(
+                [#{?snk_meta := #{node := N1}}],
+                ?of_kind(emqx_retainer_cleared_expired, Trace)
+            )
+        end
+    ).
+
+t_limited_gc_runtime(Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    ?check_trace(
+        begin
+            ok = store_retained(_NMessages = 250, Config),
+            ok = timer:sleep(1000),
+            {ok, {ok, _}} = ?wait_async_action(
+                enable_clear_expired(_Interval = 1000, Config),
+                #{?snk_kind := emqx_retainer_cleared_expired, complete := true}
+            ),
+            ok = disable_clear_expired(Config)
+        end,
+        fun(Trace) ->
+            %% Since limit is 100, we should observe 3 GC events for 250 messages:
+            ?assertMatch(
+                [
+                    #{complete := false, n_cleared := 100, ?snk_meta := #{node := N1}},
+                    #{complete := false, n_cleared := 100, ?snk_meta := #{node := N1}},
+                    #{complete := true, n_cleared := 50, ?snk_meta := #{node := N1}}
+                ],
+                ?of_kind(emqx_retainer_cleared_expired, Trace)
+            )
+        end
+    ).
+
+store_retained(NMessages, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    ?ON(
+        N1,
+        lists:foreach(
+            fun(N) ->
+                Num = integer_to_binary(N),
+                Msg = emqx_message:make(
+                    ?MODULE,
+                    0,
+                    <<"retained/", Num/binary>>,
+                    <<"payload">>,
+                    #{retain => true},
+                    #{properties => #{'Message-Expiry-Interval' => 1}}
+                ),
+                emqx:publish(Msg)
+            end,
+            lists:seq(1, NMessages)
+        )
+    ).
+
+enable_clear_expired(Interval, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(N1, emqx_retainer:update_config(#{<<"msg_clear_interval">> => Interval})),
+    ok.
+
+disable_clear_expired(Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(N1, emqx_retainer:update_config(#{<<"msg_clear_interval">> => 0})),
+    ok.

+ 53 - 11
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -39,7 +39,10 @@
     next/1,
     consume/1,
     consume/2,
-    foreach/2
+    foreach/2,
+    fold/3,
+    fold/4,
+    sweep/2
 ]).
 
 %% Streams from ETS tables
@@ -108,6 +111,7 @@ map(F, S) ->
     end.
 
 %% @doc Make a stream by filtering the underlying stream with a predicate function.
+-spec filter(fun((X) -> boolean()), stream(X)) -> stream(X).
 filter(F, S) ->
     FilterNext = fun FilterNext(St) ->
         case next(St) of
@@ -124,16 +128,6 @@ filter(F, S) ->
     end,
     fun() -> FilterNext(S) end.
 
-%% @doc Consumes the stream and applies the given function to each element.
-foreach(F, S) ->
-    case next(S) of
-        [X | Rest] ->
-            F(X),
-            foreach(F, Rest);
-        [] ->
-            ok
-    end.
-
 %% @doc Drops N first elements from the stream
 -spec drop(non_neg_integer(), stream(T)) -> stream(T).
 drop(N, S) ->
@@ -297,6 +291,54 @@ consume(N, S, Acc) ->
             lists:reverse(Acc)
     end.
 
+%% @doc Consumes the stream and applies the given function to each element.
+-spec foreach(fun((X) -> _), stream(X)) -> ok.
+foreach(F, S) ->
+    case next(S) of
+        [X | Rest] ->
+            F(X),
+            foreach(F, Rest);
+        [] ->
+            ok
+    end.
+
+%% @doc Folds the whole stream, accumulating the result of given function applied
+%% to each element.
+-spec fold(fun((X, Acc) -> Acc), Acc, stream(X)) -> Acc.
+fold(F, Acc, S) ->
+    case next(S) of
+        [X | Rest] ->
+            fold(F, F(X, Acc), Rest);
+        [] ->
+            Acc
+    end.
+
+%% @doc Folds the first N element of the stream, accumulating the result of given
+%% function applied to each element. If there's less than N elements in the given
+%% stream, returns `[]` (a.k.a. empty stream) along with the accumulated value.
+-spec fold(fun((X, Acc) -> Acc), Acc, non_neg_integer(), stream(X)) -> {Acc, stream(X)}.
+fold(_, Acc, 0, S) ->
+    {Acc, S};
+fold(F, Acc, N, S) when N > 0 ->
+    case next(S) of
+        [X | Rest] ->
+            fold(F, F(X, Acc), N - 1, Rest);
+        [] ->
+            {Acc, []}
+    end.
+
+%% @doc Same as `consume/2` but discard the consumed values.
+-spec sweep(non_neg_integer(), stream(X)) -> stream(X).
+sweep(0, S) ->
+    S;
+sweep(N, S) when N > 0 ->
+    case next(S) of
+        [_ | Rest] ->
+            sweep(N - 1, Rest);
+        [] ->
+            []
+    end.
+
 %%
 
 -type select_result(Record, Cont) ::

+ 19 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -114,6 +114,25 @@ foreach_test() ->
         emqx_utils_stream:consume(emqx_utils_stream:mqueue(100))
     ).
 
+fold_test() ->
+    S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
+    ?assertEqual(
+        3 * 4 * 5,
+        emqx_utils_stream:fold(fun(X, P) -> P * X end, 1, S)
+    ).
+
+fold_n_test() ->
+    S = emqx_utils_stream:repeat(
+        emqx_utils_stream:map(
+            fun(X) -> X * 2 end,
+            emqx_utils_stream:list([1, 2, 3])
+        )
+    ),
+    ?assertMatch(
+        {2 + 4 + 6 + 2 + 4 + 6 + 2, _SRest},
+        emqx_utils_stream:fold(fun(X, Sum) -> Sum + X end, 0, _N = 7, S)
+    ).
+
 chainmap_test() ->
     S = emqx_utils_stream:chainmap(
         fun(N) ->

+ 3 - 0
rel/i18n/emqx_retainer_schema.hocon

@@ -43,6 +43,9 @@ msg_clear_interval.desc:
 
 If `msg_clear_interval` is set to 0, that is, expired retained messages are not actively checked regularly, EMQX will only check and delete expired retained messages when preparing for delivery."""
 
+msg_clear_limit.desc:
+"""The maximum number of expired messages cleared at once each `msg_clear_interval`. Settings reasonable limit can prevent the clearing process from running for too long and consume too much resources."""
+
 msg_expiry_interval.desc:
 """Expired retained messages will not be delivered again, and a setting of 0 means that retained messages will never expire.