Przeglądaj źródła

feat(ds): add session gc process

Fixes https://emqx.atlassian.net/browse/EMQX-9744
Thales Macedo Garitezi 2 lat temu
rodzic
commit
880f5e8f89

+ 183 - 5
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -48,12 +48,36 @@ init_per_testcase(TestCase, Config) when
         {nodes, Nodes}
         | Config
     ];
+init_per_testcase(t_session_gc = TestCase, Config) ->
+    Opts = #{
+        n => 3,
+        roles => [core, core, replicant],
+        extra_emqx_conf =>
+            "\n session_persistence {"
+            "\n   last_alive_update_interval = 500ms "
+            "\n   session_gc_interval = 2s "
+            "\n   session_gc_batch_size = 1 "
+            "\n }"
+    },
+    Cluster = cluster(Opts),
+    ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
+    Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
+    [
+        {cluster, Cluster},
+        {node_specs, NodeSpecs},
+        {cluster_opts, ClusterOpts},
+        {nodes, Nodes},
+        {gc_interval, timer:seconds(2)}
+        | Config
+    ];
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
-    TestCase =:= t_session_unsubscription_idempotency
+    TestCase =:= t_session_unsubscription_idempotency;
+    TestCase =:= t_session_gc
 ->
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
@@ -67,20 +91,32 @@ end_per_testcase(_TestCase, _Config) ->
 %% Helper fns
 %%------------------------------------------------------------------------------
 
-cluster(#{n := N}) ->
-    Spec = #{role => core, apps => app_specs()},
+cluster(#{n := N} = Opts) ->
+    MkRole = fun(M) ->
+        case maps:get(roles, Opts, undefined) of
+            undefined ->
+                core;
+            Roles ->
+                lists:nth(M, Roles)
+        end
+    end,
+    MkSpec = fun(M) -> #{role => MkRole(M), apps => app_specs(Opts)} end,
     lists:map(
         fun(M) ->
             Name = list_to_atom("ds_SUITE" ++ integer_to_list(M)),
-            {Name, Spec}
+            {Name, MkSpec(M)}
         end,
         lists:seq(1, N)
     ).
 
 app_specs() ->
+    app_specs(_Opts = #{}).
+
+app_specs(Opts) ->
+    ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
     [
         emqx_durable_storage,
-        {emqx, "session_persistence = {enable = true}"}
+        {emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf}
     ].
 
 get_mqtt_port(Node, Type) ->
@@ -143,6 +179,29 @@ restart_node(Node, NodeSpec) ->
 is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
     EI > 0.
 
+list_all_sessions(Node) ->
+    erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []).
+
+list_all_subscriptions(Node) ->
+    erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []).
+
+list_all_pubranges(Node) ->
+    erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
+
+prop_only_cores_run_gc(CoreNodes) ->
+    {"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}.
+prop_only_cores_run_gc(Trace, CoreNodes) ->
+    GCNodes = lists:usort([
+        N
+     || #{
+            ?snk_kind := K,
+            ?snk_meta := #{node := N}
+        } <- Trace,
+        lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]),
+        N =/= node()
+    ]),
+    ?assertEqual(lists:usort(CoreNodes), GCNodes).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -469,3 +528,122 @@ do_t_session_expiration(_Config, Opts) ->
         []
     ),
     ok.
+
+t_session_gc(Config) ->
+    GCInterval = ?config(gc_interval, Config),
+    [Node1, Node2, Node3] = Nodes = ?config(nodes, Config),
+    CoreNodes = [Node1, Node2],
+    [
+        Port1,
+        Port2,
+        Port3
+    ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
+    CommonParams = #{
+        clean_start => false,
+        proto_ver => v5
+    },
+    StartClient = fun(ClientId, Port, ExpiryInterval) ->
+        Params = maps:merge(CommonParams, #{
+            clientid => ClientId,
+            port => Port,
+            properties => #{'Session-Expiry-Interval' => ExpiryInterval}
+        }),
+        Client = start_client(Params),
+        {ok, _} = emqtt:connect(Client),
+        Client
+    end,
+
+    ?check_trace(
+        begin
+            ClientId0 = <<"session_gc0">>,
+            Client0 = StartClient(ClientId0, Port1, 30),
+
+            ClientId1 = <<"session_gc1">>,
+            Client1 = StartClient(ClientId1, Port2, 1),
+
+            ClientId2 = <<"session_gc2">>,
+            Client2 = StartClient(ClientId2, Port3, 1),
+
+            lists:foreach(
+                fun(Client) ->
+                    Topic = <<"some/topic">>,
+                    Payload = <<"hi">>,
+                    {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client, Topic, ?QOS_1),
+                    {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
+                    ok
+                end,
+                [Client0, Client1, Client2]
+            ),
+
+            %% Clients are still alive; no session is garbage collected.
+            Res0 = ?block_until(
+                #{
+                    ?snk_kind := ds_session_gc,
+                    ?snk_span := {complete, _},
+                    ?snk_meta := #{node := N}
+                } when
+                    N =/= node(),
+                3 * GCInterval + 1_000
+            ),
+            ?assertMatch({ok, _}, Res0),
+            {ok, #{?snk_meta := #{time := T0}}} = Res0,
+            Sessions0 = list_all_sessions(Node1),
+            Subs0 = list_all_subscriptions(Node1),
+            ?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}),
+            ?assertEqual(3, map_size(Subs0), #{subs => Subs0}),
+
+            %% Now we disconnect 2 of them; only those should be GC'ed.
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqtt:stop(Client1),
+                    #{?snk_kind := terminate},
+                    1_000
+                )
+            ),
+            ct:pal("disconnected client1"),
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqtt:stop(Client2),
+                    #{?snk_kind := terminate},
+                    1_000
+                )
+            ),
+            ct:pal("disconnected client2"),
+            ?assertMatch(
+                {ok, _},
+                ?block_until(
+                    #{
+                        ?snk_kind := ds_session_gc_cleaned,
+                        ?snk_meta := #{node := N, time := T},
+                        session_ids := [ClientId1]
+                    } when
+                        N =/= node() andalso T > T0,
+                    4 * GCInterval + 1_000
+                )
+            ),
+            ?assertMatch(
+                {ok, _},
+                ?block_until(
+                    #{
+                        ?snk_kind := ds_session_gc_cleaned,
+                        ?snk_meta := #{node := N, time := T},
+                        session_ids := [ClientId2]
+                    } when
+                        N =/= node() andalso T > T0,
+                    4 * GCInterval + 1_000
+                )
+            ),
+            Sessions1 = list_all_sessions(Node1),
+            Subs1 = list_all_subscriptions(Node1),
+            ?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}),
+            ?assertEqual(1, map_size(Subs1), #{subs => Subs1}),
+
+            ok
+        end,
+        [
+            prop_only_cores_run_gc(CoreNodes)
+        ]
+    ),
+    ok.

+ 11 - 1
apps/emqx/src/emqx_cm_sup.erl

@@ -47,7 +47,17 @@ init([]) ->
     Locker = child_spec(emqx_cm_locker, 5000, worker),
     Registry = child_spec(emqx_cm_registry, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
-    {ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}.
+    DSSessionGCSup = child_spec(emqx_persistent_session_ds_gc_sup, infinity, supervisor),
+    Children =
+        [
+            Banned,
+            Flapping,
+            Locker,
+            Registry,
+            Manager,
+            DSSessionGCSup
+        ],
+    {ok, {SupFlags, Children}}.
 
 %%--------------------------------------------------------------------
 %% Internal functions

+ 13 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -63,6 +63,9 @@
 %% session table operations
 -export([create_tables/0]).
 
+%% internal export used by session GC process
+-export([destroy_session/1]).
+
 %% Remove me later (satisfy checks for an unused BPAPI)
 -export([
     do_open_iterator/3,
@@ -986,8 +989,16 @@ expiry_interval(ConnInfo) ->
 list_all_sessions() ->
     DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),
     ConnInfo = #{},
-    Sessions = lists:map(
-        fun(SessionID) -> {SessionID, session_open(SessionID, ConnInfo)} end,
+    Sessions = lists:filtermap(
+        fun(SessionID) ->
+            Sess = session_open(SessionID, ConnInfo),
+            case Sess of
+                false ->
+                    false;
+                _ ->
+                    {true, {SessionID, Sess}}
+            end
+        end,
         DSSessionIds
     ),
     maps:from_list(Sessions).

+ 78 - 0
apps/emqx/src/emqx_persistent_session_ds_gc_sup.erl

@@ -0,0 +1,78 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_session_ds_gc_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([
+    start_link/0
+]).
+
+%% `supervisor' API
+-export([
+    init/1
+]).
+
+%%--------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%--------------------------------------------------------------------------------
+%% `supervisor' API
+%%--------------------------------------------------------------------------------
+
+init(Opts) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            do_init(Opts);
+        false ->
+            ignore
+    end.
+
+do_init(_Opts) ->
+    SupFlags = #{
+        strategy => rest_for_one,
+        intensity => 10,
+        period => 2,
+        auto_shutdown => never
+    },
+    CoreChildren = [
+        worker(gc_worker, emqx_persistent_session_ds_gc_worker, [])
+    ],
+    Children =
+        case mria_rlog:role() of
+            core -> CoreChildren;
+            replicant -> []
+        end,
+    {ok, {SupFlags, Children}}.
+
+%%--------------------------------------------------------------------------------
+%% Internal fns
+%%--------------------------------------------------------------------------------
+
+worker(Id, Mod, Args) ->
+    #{
+        id => Id,
+        start => {Mod, start_link, Args},
+        type => worker,
+        restart => permanent,
+        shutdown => 10_000,
+        significant => false
+    }.

+ 161 - 0
apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl

@@ -0,0 +1,161 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_session_ds_gc_worker).
+
+-behaviour(gen_server).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-include("emqx_persistent_session_ds.hrl").
+
+%% API
+-export([
+    start_link/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% call/cast/info records
+-record(gc, {}).
+
+%%--------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------------------
+%% `gen_server' API
+%%--------------------------------------------------------------------------------
+
+init(_Opts) ->
+    ensure_gc_timer(),
+    State = #{},
+    {ok, State}.
+
+handle_call(_Call, _From, State) ->
+    {reply, error, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#gc{}, State) ->
+    try_gc(),
+    ensure_gc_timer(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------------------
+%% Internal fns
+%%--------------------------------------------------------------------------------
+
+ensure_gc_timer() ->
+    Timeout = emqx_config:get([session_persistence, session_gc_interval]),
+    _ = erlang:send_after(Timeout, self(), #gc{}),
+    ok.
+
+try_gc() ->
+    %% Only cores should run GC.
+    CoreNodes = mria_membership:running_core_nodelist(),
+    Res = global:trans(
+        {?MODULE, self()},
+        fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end,
+        CoreNodes,
+        %% Note: we set retries to 1 here because, in rare occasions, GC might start at the
+        %% same time in more than one node, and each one will abort the other.  By allowing
+        %% one retry, at least one node will (hopefully) get to enter the transaction and
+        %% the other will abort.  If GC runs too fast, both nodes might run in sequence.
+        %% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
+        %% resource-wise.
+        _Retries = 1
+    ),
+    case Res of
+        aborted ->
+            ?tp(ds_session_gc_lock_taken, #{}),
+            ok;
+        ok ->
+            ok
+    end.
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+start_gc() ->
+    do_gc(more).
+
+zombie_session_ms() ->
+    NowMS = now_ms(),
+    GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
+    BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
+    TimeThreshold = max(GCInterval, BumpInterval) * 3,
+    ets:fun2ms(
+        fun(
+            #session{
+                id = DSSessionId,
+                last_alive_at = LastAliveAt,
+                conninfo = #{expiry_interval := EI}
+            }
+        ) when
+            LastAliveAt + EI + TimeThreshold =< NowMS
+        ->
+            DSSessionId
+        end
+    ).
+
+do_gc(more) ->
+    GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
+    MS = zombie_session_ms(),
+    {atomic, Next} = mria:transaction(?DS_MRIA_SHARD, fun() ->
+        Res = mnesia:select(?SESSION_TAB, MS, GCBatchSize, write),
+        case Res of
+            '$end_of_table' ->
+                done;
+            {[], Cont} ->
+                %% since `GCBatchsize' is just a "recommendation" for `select', we try only
+                %% _once_ the continuation and then stop if it yields nothing, to avoid a
+                %% dead loop.
+                case mnesia:select(Cont) of
+                    '$end_of_table' ->
+                        done;
+                    {[], _Cont} ->
+                        done;
+                    {DSSessionIds0, _Cont} ->
+                        do_gc_(DSSessionIds0),
+                        more
+                end;
+            {DSSessionIds0, _Cont} ->
+                do_gc_(DSSessionIds0),
+                more
+        end
+    end),
+    do_gc(Next);
+do_gc(done) ->
+    ok.
+
+do_gc_(DSSessionIds) ->
+    lists:foreach(fun emqx_persistent_session_ds:destroy_session/1, DSSessionIds),
+    ?tp(ds_session_gc_cleaned, #{session_ids => DSSessionIds}),
+    ok.

+ 16 - 0
apps/emqx/src/emqx_schema.erl

@@ -1789,6 +1789,22 @@ fields("session_persistence") ->
                     desc => ?DESC(session_ds_last_alive_update_interval)
                 }
             )},
+        {"session_gc_interval",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"10m">>,
+                    desc => ?DESC(session_ds_session_gc_interval)
+                }
+            )},
+        {"session_gc_batch_size",
+            sc(
+                pos_integer(),
+                #{
+                    default => 100,
+                    desc => ?DESC(session_ds_session_gc_batch_size)
+                }
+            )},
         {"force_persistence",
             sc(
                 boolean(),

+ 0 - 2
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -510,8 +510,6 @@ t_persist_on_disconnect(Config) ->
     ?assertEqual(0, client_info(session_present, Client2)),
     ok = emqtt:disconnect(Client2).
 
-t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config);
-t_process_dies_session_expires('end', _Config) -> ok.
 t_process_dies_session_expires(Config) ->
     %% Emulate an error in the connect process,
     %% or that the node of the process goes down.

+ 6 - 0
rel/i18n/emqx_schema.hocon

@@ -1571,4 +1571,10 @@ session_builtin_n_shards.desc:
 session_storage_backend_builtin.desc:
 """Builtin session storage backend utilizing embedded RocksDB key-value store."""
 
+session_ds_session_gc_interval.desc:
+"""The interval at which session garbage collection is executed for persistent sessions."""
+
+session_ds_session_gc_batch_size.desc:
+"""The size of each batch of expired persistent sessions to be garbage collected per iteration."""
+
 }