Thales Macedo Garitezi пре 2 година
родитељ
комит
7035b4c8b3

+ 157 - 0
apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl

@@ -0,0 +1,157 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_message_ds_gc_worker).
+
+-behaviour(gen_server).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-include("emqx_persistent_session_ds.hrl").
+
+%% API
+-export([
+    start_link/0,
+    gc/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% call/cast/info records
+-record(gc, {}).
+
+%%--------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% For testing or manual ops
+gc() ->
+    gen_server:call(?MODULE, #gc{}, infinity).
+
+%%--------------------------------------------------------------------------------
+%% `gen_server' API
+%%--------------------------------------------------------------------------------
+
+init(_Opts) ->
+    ensure_gc_timer(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#gc{}, _From, State) ->
+    maybe_gc(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, error, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#gc{}, State) ->
+    try_gc(),
+    ensure_gc_timer(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------------------
+%% Internal fns
+%%--------------------------------------------------------------------------------
+
+ensure_gc_timer() ->
+    Timeout = emqx_config:get([session_persistence, message_retention_period]),
+    _ = erlang:send_after(Timeout, self(), #gc{}),
+    ok.
+
+try_gc() ->
+    %% Only cores should run GC.
+    CoreNodes = mria_membership:running_core_nodelist(),
+    Res = global:trans(
+        {?MODULE, self()},
+        fun maybe_gc/0,
+        CoreNodes,
+        %% Note: we set retries to 1 here because, in rare occasions, GC might start at the
+        %% same time in more than one node, and each one will abort the other.  By allowing
+        %% one retry, at least one node will (hopefully) get to enter the transaction and
+        %% the other will abort.  If GC runs too fast, both nodes might run in sequence.
+        %% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
+        %% resource-wise.
+        _Retries = 1
+    ),
+    case Res of
+        aborted ->
+            ?tp(ds_message_gc_lock_taken, #{}),
+            ok;
+        ok ->
+            ok
+    end.
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+maybe_gc() ->
+    AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
+    NowMS = now_ms(),
+    RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
+    TimeThreshold = NowMS - RetentionPeriod,
+    maybe_create_new_generation(AllGens, TimeThreshold),
+    ?tp_span(
+        ps_message_gc,
+        #{},
+        begin
+            ExpiredGens =
+                maps:filter(
+                    fun(_GenId, #{until := Until}) ->
+                        is_number(Until) andalso Until =< TimeThreshold
+                    end,
+                    AllGens
+                ),
+            ExpiredGenIds = maps:keys(ExpiredGens),
+            lists:foreach(
+                fun(GenId) ->
+                    ok = emqx_ds:drop_generation(?PERSISTENT_MESSAGE_DB, GenId),
+                    ?tp(message_gc_generation_dropped, #{gen_id => GenId})
+                end,
+                ExpiredGenIds
+            )
+        end
+    ).
+
+maybe_create_new_generation(AllGens, TimeThreshold) ->
+    NeedNewGen =
+        lists:all(
+            fun({_GenId, #{created_at := CreatedAt}}) ->
+                CreatedAt =< TimeThreshold
+            end,
+            maps:to_list(AllGens)
+        ),
+    case NeedNewGen of
+        false ->
+            ?tp(ps_message_gc_too_early, #{}),
+            ok;
+        true ->
+            ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
+            ?tp(ps_message_gc_added_gen, #{})
+    end.

+ 3 - 2
apps/emqx/src/emqx_persistent_session_ds_sup.erl

@@ -48,13 +48,14 @@ init(Opts) ->
 
 
 do_init(_Opts) ->
 do_init(_Opts) ->
     SupFlags = #{
     SupFlags = #{
-        strategy => rest_for_one,
+        strategy => one_for_one,
         intensity => 10,
         intensity => 10,
         period => 2,
         period => 2,
         auto_shutdown => never
         auto_shutdown => never
     },
     },
     CoreChildren = [
     CoreChildren = [
-        worker(gc_worker, emqx_persistent_session_ds_gc_worker, [])
+        worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []),
+        worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, [])
     ],
     ],
     Children =
     Children =
         case mria_rlog:role() of
         case mria_rlog:role() of

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -1853,6 +1853,14 @@ fields("session_persistence") ->
                     desc => ?DESC(session_ds_session_gc_batch_size)
                     desc => ?DESC(session_ds_session_gc_batch_size)
                 }
                 }
             )},
             )},
+        {"message_retention_period",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"1d">>,
+                    desc => ?DESC(session_ds_message_retention_period)
+                }
+            )},
         {"force_persistence",
         {"force_persistence",
             sc(
             sc(
                 boolean(),
                 boolean(),

+ 85 - 2
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -19,6 +19,7 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
 
 -compile(export_all).
 -compile(export_all).
@@ -45,10 +46,20 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Cluster = cluster(),
     Cluster = cluster(),
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     [{nodes, Nodes} | Config];
     [{nodes, Nodes} | Config];
+init_per_testcase(t_message_gc = TestCase, Config) ->
+    Opts = #{
+        extra_emqx_conf =>
+            "\n  session_persistence.message_retention_period = 1s"
+            "\n  session_persistence.storage.builtin.n_shards = 3"
+    },
+    common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
 init_per_testcase(TestCase, Config) ->
 init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config, _Opts = #{}).
+
+common_init_per_testcase(TestCase, Config, Opts) ->
     ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     Apps = emqx_cth_suite:start(
     Apps = emqx_cth_suite:start(
-        app_specs(),
+        app_specs(Opts),
         #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
         #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
     ),
     [{apps, Apps} | Config].
     [{apps, Apps} | Config].
@@ -379,6 +390,66 @@ t_publish_empty_topic_levels(_Config) ->
         emqtt:stop(Pub)
         emqtt:stop(Pub)
     end.
     end.
 
 
+t_message_gc_too_young(_Config) ->
+    %% Check that GC doesn't attempt to create a new generation if there are fresh enough
+    %% generations around.  The stability of this test relies on the default value for
+    %% message retention being long enough.  Currently, the default is 1 hour.
+    ?check_trace(
+        ok = emqx_persistent_message_ds_gc_worker:gc(),
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_message_gc(Config) ->
+    %% Check that, after GC runs, a new generation is created, retaining messages, and
+    %% older messages no longer are accessible.
+    NShards = ?config(n_shards, Config),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            %% ensure some messages are in the first generation
+            ?force_ordering(
+                #{?snk_kind := inserted_batch},
+                #{?snk_kind := ps_message_gc_added_gen}
+            ),
+            Msgs0 = [
+                message(<<"foo/bar">>, <<"1">>, 0),
+                message(<<"foo/baz">>, <<"2">>, 1)
+            ],
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
+            ?tp(inserted_batch, #{}),
+            {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
+
+            Now = emqx_message:timestamp_now(),
+            Msgs1 = [
+                message(<<"foo/bar">>, <<"3">>, Now + 100),
+                message(<<"foo/baz">>, <<"4">>, Now + 101)
+            ],
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
+
+            {ok, _} = snabbkaffe:block_until(
+                ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
+                infinity
+            ),
+
+            TopicFilter = emqx_topic:words(<<"#">>),
+            StartTime = 0,
+            Msgs = consume(TopicFilter, StartTime),
+            %% only "1" and "2" should have been GC'ed
+            ?assertEqual(
+                sets:from_list([<<"3">>, <<"4">>], [{version, 2}]),
+                sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}])
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%
 %%
 
 
 connect(ClientId, CleanStart, EI) ->
 connect(ClientId, CleanStart, EI) ->
@@ -438,9 +509,13 @@ publish(Node, Message) ->
     erpc:call(Node, emqx, publish, [Message]).
     erpc:call(Node, emqx, publish, [Message]).
 
 
 app_specs() ->
 app_specs() ->
+    app_specs(_Opts = #{}).
+
+app_specs(Opts) ->
+    ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
     [
     [
         emqx_durable_storage,
         emqx_durable_storage,
-        {emqx, "session_persistence {enable = true}"}
+        {emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
     ].
     ].
 
 
 cluster() ->
 cluster() ->
@@ -459,3 +534,11 @@ clear_db() ->
     mria:stop(),
     mria:stop(),
     ok = mnesia:delete_schema([node()]),
     ok = mnesia:delete_schema([node()]),
     ok.
     ok.
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.

+ 1 - 0
changes/ce/feat-12338.en.md

@@ -0,0 +1 @@
+Added time-based message garbage collection to the RocksDB-based persistent session backend.

+ 3 - 0
rel/i18n/emqx_schema.hocon

@@ -1603,5 +1603,8 @@ The session will query the DB for the new messages when the value of `FreeSpace`
 
 
 `FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
 `FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
 
 
+session_ds_message_retention_period.desc:
+"""The minimum amount of time that messages should be retained for.  After messages have been in storage for at least this period of time, they'll be dropped."""
+
 
 
 }
 }